diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt index 0d2e2da4ab..c56b0c7717 100644 --- a/be/src/benchmarks/CMakeLists.txt +++ b/be/src/benchmarks/CMakeLists.txt @@ -53,6 +53,7 @@ ADD_BE_BENCHMARK(network-perf-benchmark) ADD_BE_BENCHMARK(overflow-benchmark) ADD_BE_BENCHMARK(parse-timestamp-benchmark) ADD_BE_BENCHMARK(parquet-delta-benchmark) +ADD_BE_BENCHMARK(parquet-byte-stream-split-decoder-benchmark) ADD_BE_BENCHMARK(process-wide-locks-benchmark) ADD_BE_BENCHMARK(rle-benchmark) ADD_BE_BENCHMARK(row-batch-serialize-benchmark) diff --git a/be/src/benchmarks/parquet-byte-stream-split-decoder-benchmark.cc b/be/src/benchmarks/parquet-byte-stream-split-decoder-benchmark.cc new file mode 100644 index 0000000000..3434a1be9d --- /dev/null +++ b/be/src/benchmarks/parquet-byte-stream-split-decoder-benchmark.cc @@ -0,0 +1,514 @@ +/// Licensed to the Apache Software Foundation (ASF) under one +/// or more contributor license agreements. See the NOTICE file +/// distributed with this work for additional information +/// regarding copyright ownership. The ASF licenses this file +/// to you under the Apache License, Version 2.0 (the +/// "License"); you may not use this file except in compliance +/// with the License. You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, +/// software distributed under the License is distributed on an +/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +/// KIND, either express or implied. See the License for the +/// specific language governing permissions and limitations +/// under the License. + +#include +#include + +#include "exec/parquet/parquet-byte-stream-split-decoder.h" +#include "exec/parquet/parquet-byte-stream-split-encoder.h" +#include "util/benchmark.h" +#include "util/cpu-info.h" + +using namespace impala; + +constexpr int DATA_BATCH_SIZE = 1000; + +// -------------------------------- Benchmark Results --------------------------------- // + +// Machine Info: 13th Gen Intel(R) Core(TM) i9-13900 +// Data Batch Size = 1000 +// Data Pool Size for Pooled Data = 124 +// Skip Sizes (Read | Skip): 82 | 18 +// Stride Sizes (S | M | L): 15 | 2985 | 213525 + +// ━━━━━━━━━━━━━━━━━━━━━ Byte Stream Split functionality comparison ━━━━━━━━━━━━━━━━━━━━━━ + +// ────────────────────── Compile VS Runtime | Sequential | Batched ────────────────────── +// Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// ┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄ +// Compile Int 2.46e+03 2.49e+03 2.52e+03 1X 1X 1X +// Runtime Int 467 470 475 0.19X 0.189X 0.188X +// Compile Long 1.17e+03 1.19e+03 1.21e+03 0.476X 0.479X 0.48X +// Runtime Long 200 202 203 0.0811X 0.0811X 0.0806X + + +// ───────────────────── Type Comparison | Runtime | Random | Batched ──────────────────── +// Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// ┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄ +// Int 452 470 474 1X 1X 1X +// Float 453 469 474 1X 0.998X 1X +// 6 bytes 269 283 284 0.596X 0.602X 0.6X +// Long 194 202 203 0.429X 0.429X 0.429X +// Double 194 202 203 0.429X 0.429X 0.429X +// 11 bytes 137 141 142 0.304X 0.3X 0.3X + + +// ────────────── Repeating VS Sequential VS Random | Compile Time | Batched ───────────── +// Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// ┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄ +// Repeating Int 2.36e+03 2.47e+03 2.51e+03 1X 1X 1X +// Sequential Int 2.41e+03 2.48e+03 2.52e+03 1.02X 1X 1X +// Random Int 2.4e+03 2.49e+03 2.52e+03 1.02X 1.01X 1X +// Repeating Long 1.16e+03 1.18e+03 1.22e+03 0.491X 0.479X 0.484X +// Sequential Long 1.15e+03 1.19e+03 1.21e+03 0.486X 0.479X 0.48X +// Random Long 1.14e+03 1.18e+03 1.21e+03 0.484X 0.477X 0.481X + + +// ──────────────── Singles VS Batch VS Stride | Compile Time | Sequential ─────────────── +// Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// ┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄ +// Singles Int 1.24e+03 1.27e+03 1.28e+03 1X 1X 1X +// Batch Int 2.42e+03 2.48e+03 2.51e+03 1.95X 1.95X 1.96X +// Stride Int 2.41e+03 2.49e+03 2.51e+03 1.94X 1.95X 1.96X +// Singles Long 812 827 837 0.653X 0.65X 0.652X +// Batch Long 1.16e+03 1.19e+03 1.21e+03 0.934X 0.931X 0.941X +// Stride Long 1.18e+03 1.21e+03 1.23e+03 0.949X 0.954X 0.962X + + +// ──────── Small VS Medium VS Large Stride | Compile Time | Sequential | Batched ──────── +// Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// ┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄ +// S Stride Int 2.41e+03 2.49e+03 2.52e+03 1X 1X 1X +// M Stride Int 1.92e+03 2e+03 2.03e+03 0.795X 0.804X 0.806X +// L Stride Int 1.87e+03 1.92e+03 1.95e+03 0.774X 0.772X 0.774X +// S Stride Long 1.16e+03 1.22e+03 1.23e+03 0.481X 0.488X 0.49X +// M Stride Long 1.01e+03 1.08e+03 1.09e+03 0.419X 0.434X 0.433X +// L Stride Long 987 1.03e+03 1.04e+03 0.409X 0.413X 0.414X + +// --------------------------------- Data Structures ---------------------------------- // + +template +struct BSSTestData { + const std::vector& input_bdata; + const int stride; + + std::vector encoded_bdata; + std::vector output; + + BSSTestData(const std::vector& b, int s = B_SIZE) : input_bdata(b), stride(s) { + output.resize(stride * (input_bdata.size() / B_SIZE)); + GenerateBSSEncoded(); + } + + private: + void GenerateBSSEncoded() { + ParquetByteStreamSplitEncoder<0> encoder(B_SIZE); + std::vector temp(input_bdata.size()); + encoded_bdata.resize(input_bdata.size()); + encoder.NewPage(temp.data(), temp.size()); + for (int i = 0; i < input_bdata.size() / B_SIZE; i++) { + if (!encoder.PutBytes(input_bdata.data() + i * B_SIZE)) { + std::cerr << "Error: Value could not be put at ind " << i << std::endl; + return; + } + } + if (encoder.FinalizePage(encoded_bdata.data(), encoded_bdata.size()) + != input_bdata.size() / B_SIZE) { + std::cerr << "Error: Could not write all values upon FinalizePage" << std::endl; + return; + } + } +}; + +// --------------------------------- Helper Functions --------------------------------- // + +// ............ Data Generator Functions ............ // + +// Fill the vector with the same repeating data. (42,42,42,42,...) +void DataSameRepGen(std::vector* bdata, int b_size) { + for (int i = 0; i < DATA_BATCH_SIZE * b_size; i++) { + bdata->push_back(0x75); + } +} + +// Fill the vector with sequential data (41,42,43,44,...). +void DataSequentialGen(std::vector* bdata, int b_size) { + bdata->resize(DATA_BATCH_SIZE * b_size); + long offset = rand() * rand() - DATA_BATCH_SIZE; + for (int i = 0; i < DATA_BATCH_SIZE; i++) { + long j = i + offset; + memcpy(bdata->data() + i * b_size, &j, std::min((int)sizeof(j), b_size)); + } +} + +// Fill the vector with completely random data. +void DataRandGen(std::vector* bdata, int b_size) { + srand(154698135); + for (int i = 0; i < DATA_BATCH_SIZE * b_size; i++) { + bdata->push_back(rand() % numeric_limits::max()); + } +} + +// .......... Benchmark Data Transformer Functions .......... // + +template +std::vector GenerateStrided(const std::vector& input, int stride) { + std::vector strided_bd(input.size() / BSIZE * stride); + for (int i = 0; i < input.size() / BSIZE; i++) { + memcpy(strided_bd.data() + i * stride, input.data() + i * BSIZE, BSIZE); + } + return strided_bd; +} + +// ........... Output Checking Functions ............ // + +// We could use operator== instead of this, but using this function gives better +// readability, and makes debugging easier. +void testOutputCorrectness( + const std::vector& output, const std::vector& expected) { + if (output.size() != expected.size()) { + std::cerr << "Vector sizes do not match" << std::endl; + std::cerr << "Output size (bytes): " << output.size() << + ", Expected size (bytes): " << expected.size() << std::endl; + return; + } + for (int i = 0; i < expected.size(); i++) { + if (output[i] != expected[i]) { + std::cerr << "Vectors do not match at index " << i << std::endl; + return; + } + } +} + +// ------------------------------ Benchmarked Functions ------------------------------- // + +// ................... BSS Tests .................... // + +template +void BSS_DecodeBatch(int batch_size, void* d, ParquetByteStreamSplitDecoder& decoder) { + BSSTestData* data = reinterpret_cast*>(d); + + for (int batch = 0; batch < batch_size; batch++) { + uint8_t* output_ptr = data->output.data(); + decoder.NewPage(data->encoded_bdata.data(), data->encoded_bdata.size()); + + if (decoder.NextValues(data->encoded_bdata.size() / B_SIZE, output_ptr, B_SIZE) + != data->encoded_bdata.size() / B_SIZE) { + std::cerr << "Error: Could not decode all values" << std::endl; + return; + } + } +} + +template +void BSSRun_DecodeBatch(int batch_size, void* d) { + ParquetByteStreamSplitDecoder<0> decoder(B_SIZE); + BSS_DecodeBatch(batch_size, d, decoder); +} + +template +void BSSComp_DecodeBatch(int batch_size, void* d) { + ParquetByteStreamSplitDecoder decoder; + BSS_DecodeBatch(batch_size, d, decoder); +} + +template +void BSSComp_DecodeSingles(int batch_size, void* d) { + BSSTestData* data = reinterpret_cast*>(d); + ParquetByteStreamSplitDecoder decoder; + + for (int batch = 0; batch < batch_size; batch++) { + uint8_t* output_ptr = data->output.data(); + decoder.NewPage(data->encoded_bdata.data(), data->encoded_bdata.size()); + for (int j = 0; j < data->encoded_bdata.size() / sizeof(T); j++) { + if (decoder.NextValue(reinterpret_cast(output_ptr)) != 1) { + std::cerr << "Error: Could not decode all values" << std::endl; + return; + } + output_ptr += sizeof(T); + } + } +} + +template +void BSSComp_DecodeStride(int batch_size, void* d) { + BSSTestData* data = reinterpret_cast*>(d); + ParquetByteStreamSplitDecoder decoder; + + for (int batch = 0; batch < batch_size; batch++) { + uint8_t* output_ptr = data->output.data(); + decoder.NewPage(data->encoded_bdata.data(), data->encoded_bdata.size()); + if (decoder.NextValues(data->encoded_bdata.size() / B_SIZE, output_ptr, data->stride) + != data->encoded_bdata.size() / B_SIZE) { + std::cerr << "Error: Could not decode all values" << std::endl; + return; + } + } +} + +template +void BSSComp_DecodeSkip(int batch_size, void* d) { + BSSTestData* data = reinterpret_cast*>(d); + ParquetByteStreamSplitDecoder decoder; + + for (int batch = 0; batch < batch_size; batch++) { + uint8_t* output_ptr = data->output.data(); + decoder.NewPage(data->encoded_bdata.data(), data->encoded_bdata.size()); + for (int i = 0; i < decoder.GetTotalValueCount(); i += READ + SKIP) { + if (decoder.NextValues(READ, output_ptr, B_SIZE) < 0) { + std::cerr << "Error reading values at index " << i << std::endl; + return; + } + if (decoder.SkipValues(SKIP) < 0) { + std::cerr << "Error skipping values at index " << i << std::endl; + return; + } + output_ptr += READ * B_SIZE; + } + } +} + +// ------------------------------- Benchmark Functions -------------------------------- // + +// ................. BSS Benchmarks ................. // + +void CompileVSRuntime() { + std::vector byte_data4b; + std::vector byte_data8b; + DataSequentialGen(&byte_data4b, 4); + DataSequentialGen(&byte_data8b, 8); + + BSSTestData<4> dataIntTempl(byte_data4b); + BSSTestData<8> dataLongTempl(byte_data8b); + BSSTestData<4> dataIntConstr(byte_data4b); + BSSTestData<8> dataLongConstr(byte_data8b); + + // Compile - template, Runtime - constructor + Benchmark suite("Compile VS Runtime | Sequential | Batched"); + suite.AddBenchmark("Compile Int", BSSComp_DecodeBatch, &dataIntConstr); + suite.AddBenchmark("Runtime Int", BSSRun_DecodeBatch, &dataIntTempl); + suite.AddBenchmark("Compile Long", BSSComp_DecodeBatch, &dataLongConstr); + suite.AddBenchmark("Runtime Long", BSSRun_DecodeBatch, &dataLongTempl); + std::cout << suite.Measure(); + + // Test the output data to make sure that the functions are not optimised out + + testOutputCorrectness(dataIntTempl.output, dataIntTempl.input_bdata); + testOutputCorrectness(dataLongTempl.output, dataLongTempl.input_bdata); + testOutputCorrectness(dataIntConstr.output, dataIntConstr.input_bdata); + testOutputCorrectness(dataLongConstr.output, dataLongConstr.input_bdata); +} + +void TypeComparison() { + std::vector byte_data4b; + std::vector byte_data8b; + std::vector byte_data6b; + std::vector byte_data11b; + + DataRandGen(&byte_data4b, 4); + DataRandGen(&byte_data6b, 6); + DataRandGen(&byte_data8b, 8); + DataRandGen(&byte_data11b, 11); + + BSSTestData<4> dataInt(byte_data4b); + BSSTestData<8> dataLong(byte_data8b); + BSSTestData<6> data6b(byte_data6b); + BSSTestData<4> dataFloat(byte_data4b); + BSSTestData<8> dataDouble(byte_data8b); + BSSTestData<11> data11b(byte_data11b); + + // Since we are comparing types that are not a size of 4 or 8, we must use the runtime + // version. + Benchmark suite("Type Comparison | Runtime | Random | Batched"); + suite.AddBenchmark("Int", BSSRun_DecodeBatch, &dataInt); + suite.AddBenchmark("Float", BSSRun_DecodeBatch, &dataFloat); + suite.AddBenchmark("6 bytes", BSSRun_DecodeBatch<6>, &data6b); + suite.AddBenchmark("Long", BSSRun_DecodeBatch, &dataLong); + suite.AddBenchmark("Double", BSSRun_DecodeBatch, &dataDouble); + suite.AddBenchmark("11 bytes", BSSRun_DecodeBatch<11>, &data11b); + std::cout << suite.Measure(); + + // Test the output data to make sure that the functions are not optimised out + + testOutputCorrectness(dataInt.output, dataInt.input_bdata); + testOutputCorrectness(dataLong.output, dataLong.input_bdata); + testOutputCorrectness(data6b.output, data6b.input_bdata); + testOutputCorrectness(dataFloat.output, dataFloat.input_bdata); + testOutputCorrectness(dataDouble.output, dataDouble.input_bdata); + testOutputCorrectness(data11b.output, data11b.input_bdata); +} + +void RepeatingVSSequentialVSRandom() { + std::vector repeating_data4b; + std::vector repeating_data8b; + std::vector sequential_data4b; + std::vector sequential_data8b; + std::vector random_data4b; + std::vector random_data8b; + + DataSameRepGen(&repeating_data4b, 4); + DataSameRepGen(&repeating_data8b, 8); + DataSequentialGen(&sequential_data4b, 4); + DataSequentialGen(&sequential_data8b, 8); + DataRandGen(&random_data4b, 4); + DataRandGen(&random_data8b, 8); + + BSSTestData<4> dataIntRep(repeating_data4b); + BSSTestData<8> dataLongRep(repeating_data8b); + + BSSTestData<4> dataIntSeq(sequential_data4b); + BSSTestData<8> dataLongSeq(sequential_data8b); + + BSSTestData<4> dataIntRand(random_data4b); + BSSTestData<8> dataLongRand(random_data8b); + + Benchmark suite("Repeating VS Sequential VS Random | Compile Time | Batched"); + suite.AddBenchmark("Repeating Int", BSSComp_DecodeBatch, &dataIntRep); + suite.AddBenchmark("Sequential Int", BSSComp_DecodeBatch, &dataIntSeq); + suite.AddBenchmark("Random Int", BSSComp_DecodeBatch, &dataIntRand); + suite.AddBenchmark("Repeating Long", BSSComp_DecodeBatch, &dataLongRep); + suite.AddBenchmark("Sequential Long", BSSComp_DecodeBatch, &dataLongSeq); + suite.AddBenchmark("Random Long", BSSComp_DecodeBatch, &dataLongRand); + std::cout << suite.Measure(); + + // Test the output data to make sure that the functions are not optimised out + + testOutputCorrectness(dataIntRep.output, dataIntRep.input_bdata); + testOutputCorrectness(dataLongRep.output, dataLongRep.input_bdata); + testOutputCorrectness(dataIntSeq.output, dataIntSeq.input_bdata); + testOutputCorrectness(dataLongSeq.output, dataLongSeq.input_bdata); + testOutputCorrectness(dataIntRand.output, dataIntRand.input_bdata); + testOutputCorrectness(dataLongRand.output, dataLongRand.input_bdata); +} + +void SinglesVSBatchVSStride() { + std::vector byte_data4b; + std::vector byte_data8b; + DataSequentialGen(&byte_data4b, 4); + DataSequentialGen(&byte_data8b, 8); + + BSSTestData<4> dataIntSingles(byte_data4b); + BSSTestData<8> dataLongSingles(byte_data8b); + + BSSTestData<4> dataIntBatch(byte_data4b); + BSSTestData<8> dataLongBatch(byte_data8b); + + constexpr int stride = sizeof(int) + sizeof(long) + 7; + + BSSTestData<4> dataIntStride(byte_data4b, stride); + BSSTestData<8> dataLongStride(byte_data8b, stride); + + Benchmark suite("Singles VS Batch VS Stride | Compile Time | Sequential"); + suite.AddBenchmark("Singles Int", BSSComp_DecodeSingles, &dataIntSingles); + suite.AddBenchmark("Batch Int", BSSComp_DecodeBatch, &dataIntBatch); + suite.AddBenchmark("Stride Int", BSSComp_DecodeStride, &dataIntStride); + suite.AddBenchmark("Singles Long", BSSComp_DecodeSingles, &dataLongSingles); + suite.AddBenchmark("Batch Long", BSSComp_DecodeBatch, &dataLongBatch); + suite.AddBenchmark("Stride Long", BSSComp_DecodeStride, &dataLongStride); + std::cout << suite.Measure(); + + // Test the output data to make sure that the functions are not optimised out + + testOutputCorrectness(dataIntSingles.output, dataIntSingles.input_bdata); + testOutputCorrectness(dataLongSingles.output, dataLongSingles.input_bdata); + + testOutputCorrectness(dataIntBatch.output, dataIntBatch.input_bdata); + testOutputCorrectness(dataLongBatch.output, dataLongBatch.input_bdata); + + testOutputCorrectness(dataIntStride.output, + GenerateStrided(dataIntStride.input_bdata, dataIntStride.stride)); + testOutputCorrectness(dataLongStride.output, + GenerateStrided(dataLongStride.input_bdata, dataLongStride.stride)); +} + +void StrideSizeComparison(int strideS, int strideM, int strideL) { + std::vector byte_data4b; + std::vector byte_data8b; + + DataSequentialGen(&byte_data4b, 4); + DataSequentialGen(&byte_data8b, 8); + + BSSTestData<4> dataIntSStride(byte_data4b, strideS); + BSSTestData<4> dataIntMStride(byte_data4b, strideM); + BSSTestData<4> dataIntLStride(byte_data4b, strideL); + BSSTestData<8> dataLongSStride(byte_data8b, strideS); + BSSTestData<8> dataLongMStride(byte_data8b, strideM); + BSSTestData<8> dataLongLStride(byte_data8b, strideL); + + Benchmark suite("Small VS Medium VS Large Stride | Compile Time | Sequential | Batched"); + suite.AddBenchmark("S Stride Int", BSSComp_DecodeStride, &dataIntSStride); + suite.AddBenchmark("M Stride Int", BSSComp_DecodeStride, &dataIntMStride); + suite.AddBenchmark("L Stride Int", BSSComp_DecodeStride, &dataIntLStride); + suite.AddBenchmark("S Stride Long", BSSComp_DecodeStride, + &dataLongSStride); + suite.AddBenchmark("M Stride Long", BSSComp_DecodeStride, + &dataLongMStride); + suite.AddBenchmark("L Stride Long", BSSComp_DecodeStride, + &dataLongLStride); + std::cout << suite.Measure(); + + // Test the output data to make sure that the functions are not optimised out + + testOutputCorrectness(dataIntSStride.output, + GenerateStrided(dataIntSStride.input_bdata, dataIntSStride.stride)); + testOutputCorrectness(dataIntMStride.output, + GenerateStrided(dataIntMStride.input_bdata, dataIntMStride.stride)); + testOutputCorrectness(dataIntLStride.output, + GenerateStrided(dataIntLStride.input_bdata, dataIntLStride.stride)); + + testOutputCorrectness(dataLongSStride.output, + GenerateStrided(dataLongSStride.input_bdata, dataLongSStride.stride)); + testOutputCorrectness(dataLongMStride.output, + GenerateStrided(dataLongMStride.input_bdata, dataLongMStride.stride)); + testOutputCorrectness(dataLongLStride.output, + GenerateStrided(dataLongLStride.input_bdata, dataLongLStride.stride)); +} + +// ---------------------------------- Main Function ----------------------------------- // + +int main(int argc, char** argv) { + constexpr int pool = 124; + constexpr int strideS = sizeof(int) + sizeof(long) + 3; + constexpr int strideM = 199 * strideS; + constexpr int strideL = 14235 * strideS; + constexpr int read = 82; + constexpr int skip = 18; + + CpuInfo::Init(); + std::cout << " " << Benchmark::GetMachineInfo() << std::endl; + std::cout << " Data Batch Size = " << DATA_BATCH_SIZE + << std::endl; + std::cout << " Data Pool Size for Pooled Data = " << pool << std::endl; + std::cout << " Skip Sizes (Read | Skip): " << + read << " | " << skip << std::endl; + std::cout << " Stride Sizes (S | M | L): " << + strideS << " | " << strideM << " | " << strideL << std::endl; + std::cout << "\n\n"; + + std::cout << "\n\n"; + std::cout << "━━━━━━━━━━━━━━━━━━━━━ Byte Stream Split functionality comparison " + << "━━━━━━━━━━━━━━━━━━━━━━\n"; + std::cout << "\n"; + + CompileVSRuntime(); + std::cout << "\n\n"; + TypeComparison(); + std::cout << "\n\n"; + RepeatingVSSequentialVSRandom(); + std::cout << "\n\n"; + SinglesVSBatchVSStride(); + std::cout << "\n\n"; + StrideSizeComparison(strideS, strideM, strideL); + std::cout << "\n\n"; + + return 0; +} diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc index 81248f8ba6..abc74c0e84 100644 --- a/be/src/codegen/impala-ir.cc +++ b/be/src/codegen/impala-ir.cc @@ -43,6 +43,7 @@ #include "exprs/ai-functions-ir.cc" #include "exprs/bit-byte-functions-ir.cc" #include "exprs/cast-functions-ir.cc" +#include "exprs/collection-functions-ir.cc" #include "exprs/compound-predicates-ir.cc" #include "exprs/conditional-functions-ir.cc" #include "exprs/datasketches-functions-ir.cc" diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 513ba47366..f705c305e6 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -30,6 +30,7 @@ add_library(ExprsIr ai-functions-ir.cc bit-byte-functions-ir.cc cast-functions-ir.cc + collection-functions-ir.cc compound-predicates-ir.cc conditional-functions-ir.cc datasketches-functions-ir.cc diff --git a/be/src/exprs/anyval-util.cc b/be/src/exprs/anyval-util.cc index 4cb9528b64..89a56f0443 100644 --- a/be/src/exprs/anyval-util.cc +++ b/be/src/exprs/anyval-util.cc @@ -90,8 +90,22 @@ FunctionContext::TypeDesc AnyValUtil::ColumnTypeToTypeDesc(const ColumnType& typ case TYPE_DATE: out.type = FunctionContext::TYPE_DATE; break; + case TYPE_ARRAY: + out.type = FunctionContext::TYPE_ARRAY; + DCHECK_EQ(type.children.size(), 1); + out.children.push_back(ColumnTypeToTypeDesc(type.children[0])); + break; + case TYPE_MAP: + out.type = FunctionContext::TYPE_MAP; + DCHECK_EQ(type.children.size(), 2); + out.children.push_back(ColumnTypeToTypeDesc(type.children[0])); + out.children.push_back(ColumnTypeToTypeDesc(type.children[1])); + break; case TYPE_STRUCT: out.type = FunctionContext::TYPE_STRUCT; + for (const ColumnType& child : type.children) { + out.children.push_back(ColumnTypeToTypeDesc(child)); + } break; default: DCHECK(false) << "Unknown type: " << type; @@ -137,6 +151,24 @@ ColumnType AnyValUtil::TypeDescToColumnType(const FunctionContext::TypeDesc& typ return ColumnType::CreateVarcharType(type.len); case FunctionContext::TYPE_DATE: return ColumnType(TYPE_DATE); + case FunctionContext::TYPE_ARRAY: { + DCHECK_EQ(type.children.size(), 1); + ColumnType element_type = TypeDescToColumnType(type.children[0]); + return ColumnType::CreateArrayType(element_type); + } + case FunctionContext::TYPE_MAP: { + DCHECK_EQ(type.children.size(), 2); + ColumnType key_type = TypeDescToColumnType(type.children[0]); + ColumnType value_type = TypeDescToColumnType(type.children[1]); + return ColumnType::CreateMapType(key_type, value_type); + } + case FunctionContext::TYPE_STRUCT: { + vector children; + for (const FunctionContext::TypeDesc& child : type.children) { + children.push_back(TypeDescToColumnType(child)); + } + return ColumnType::CreateStructType(children); + } default: DCHECK(false) << "Unknown type: " << type.type; return ColumnType(INVALID_TYPE); diff --git a/be/src/exprs/anyval-util.h b/be/src/exprs/anyval-util.h index f14f835269..ce3b48de70 100644 --- a/be/src/exprs/anyval-util.h +++ b/be/src/exprs/anyval-util.h @@ -20,6 +20,7 @@ #include +#include "runtime/collection-value.h" #include "runtime/date-value.h" #include "runtime/runtime-state.h" #include "runtime/string-value.inline.h" @@ -42,6 +43,7 @@ using impala_udf::TimestampVal; using impala_udf::StringVal; using impala_udf::DecimalVal; using impala_udf::DateVal; +using impala_udf::CollectionVal; class ObjectPool; @@ -203,6 +205,8 @@ class AnyValUtil { case TYPE_TIMESTAMP: return sizeof(TimestampVal); case TYPE_DECIMAL: return sizeof(DecimalVal); case TYPE_DATE: return sizeof(DateVal); + case TYPE_ARRAY: + case TYPE_MAP: return sizeof(CollectionVal); default: DCHECK(false) << t; return 0; @@ -227,6 +231,8 @@ class AnyValUtil { case TYPE_TIMESTAMP: return alignof(TimestampVal); case TYPE_DECIMAL: return alignof(DecimalVal); case TYPE_DATE: return alignof(DateVal); + case TYPE_ARRAY: + case TYPE_MAP: return alignof(CollectionVal); default: DCHECK(false) << t; return 0; @@ -337,6 +343,12 @@ class AnyValUtil { *reinterpret_cast(dst) = reinterpret_cast(slot)->ToDateVal(); return; + case TYPE_ARRAY: + case TYPE_MAP: { + *reinterpret_cast(dst) = + reinterpret_cast(slot)->ToCollectionVal(); + return; + } default: DCHECK(false) << "NYI: " << type; } diff --git a/be/src/exprs/collection-functions-ir.cc b/be/src/exprs/collection-functions-ir.cc new file mode 100644 index 0000000000..517990047a --- /dev/null +++ b/be/src/exprs/collection-functions-ir.cc @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/collection-functions.h" +#include "exprs/scalar-expr.h" +#include "runtime/collection-value.h" +#include "runtime/descriptors.h" +#include "runtime/string-value.h" +#include "runtime/tuple.h" +#include "udf/udf-internal.h" + +#include + +namespace impala { + +using impala_udf::FunctionContext; +using impala_udf::BooleanVal; +using impala_udf::CollectionVal; + +namespace { + +struct ArrayContainsState { + const TupleDescriptor* tuple_desc = nullptr; + const SlotDescriptor* slot_desc = nullptr; + int tuple_byte_size = 0; + int slot_offset = 0; + NullIndicatorOffset null_offset; +}; + +ScalarExpr* GetArrayExpr(FunctionContext* ctx) { + const auto& non_constant_args = ctx->impl()->non_constant_args(); + for (const auto& child : non_constant_args) { + if (child.first != nullptr && child.first->type().IsCollectionType()) { + return child.first; + } + } + return nullptr; +} + +bool InitArrayContainsState(FunctionContext* ctx, ArrayContainsState* state) { + DCHECK(state != nullptr); + ScalarExpr* array_expr = GetArrayExpr(ctx); + if (array_expr == nullptr) { + ctx->SetError("array_contains requires a non-constant ARRAY argument."); + return false; + } + const TupleDescriptor* tuple_desc = array_expr->GetCollectionTupleDesc(); + if (tuple_desc == nullptr || tuple_desc->slots().empty()) { + ctx->SetError( + "Failed to resolve collection item tuple descriptor for array_contains()."); + return false; + } + if (tuple_desc->slots().size() != 1) { + ctx->SetError("array_contains only supports ARRAYs with a single element slot."); + return false; + } + state->tuple_desc = tuple_desc; + state->slot_desc = tuple_desc->slots()[0]; + state->tuple_byte_size = tuple_desc->byte_size(); + state->slot_offset = state->slot_desc->tuple_offset(); + state->null_offset = state->slot_desc->null_indicator_offset(); + return true; +} + +ArrayContainsState* GetOrCreateArrayState(FunctionContext* ctx) { + ArrayContainsState* state = reinterpret_cast( + ctx->GetFunctionState(FunctionContext::THREAD_LOCAL)); + if (state != nullptr) return state; + state = new ArrayContainsState(); + if (!InitArrayContainsState(ctx, state)) { + delete state; + ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, nullptr); + return nullptr; + } + ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, state); + return state; +} + +inline bool IsElementNull(const ArrayContainsState* state, Tuple* tuple) { + return state->slot_desc->is_nullable() && tuple->IsNull(state->null_offset); +} + +template +BooleanVal ArrayContainsPrimitive(FunctionContext* ctx, const CollectionVal& arr, + const ValType& item) { + if (arr.is_null || item.is_null) return BooleanVal::null(); + ArrayContainsState* state = GetOrCreateArrayState(ctx); + if (state == nullptr) return BooleanVal::null(); + DCHECK_GT(state->tuple_byte_size, 0); + uint8_t* tuple_ptr = arr.ptr; + for (int i = 0; i < arr.num_tuples; ++i, tuple_ptr += state->tuple_byte_size) { + Tuple* tuple = reinterpret_cast(tuple_ptr); + if (IsElementNull(state, tuple)) continue; + NativeType current_value = + *reinterpret_cast(tuple->GetSlot(state->slot_offset)); + if (current_value == item.val) return BooleanVal(true); + } + return BooleanVal(false); +} + +} // namespace + +void CollectionFunctions::ArrayContainsPrepare(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope) { + if (scope != FunctionContext::THREAD_LOCAL) return; + auto* state = new ArrayContainsState(); + if (!InitArrayContainsState(ctx, state)) { + delete state; + state = nullptr; + } + ctx->SetFunctionState(scope, state); +} + +void CollectionFunctions::ArrayContainsClose(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope) { + if (scope != FunctionContext::THREAD_LOCAL) return; + delete reinterpret_cast( + ctx->GetFunctionState(FunctionContext::THREAD_LOCAL)); + ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, nullptr); +} + +BooleanVal CollectionFunctions::ArrayContainsBoolean( + FunctionContext* ctx, const CollectionVal& arr, const BooleanVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsTinyInt( + FunctionContext* ctx, const CollectionVal& arr, const TinyIntVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsSmallInt( + FunctionContext* ctx, const CollectionVal& arr, const SmallIntVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsInt( + FunctionContext* ctx, const CollectionVal& arr, const IntVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsBigInt( + FunctionContext* ctx, const CollectionVal& arr, const BigIntVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsFloat( + FunctionContext* ctx, const CollectionVal& arr, const FloatVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsDouble( + FunctionContext* ctx, const CollectionVal& arr, const DoubleVal& item) { + return ArrayContainsPrimitive(ctx, arr, item); +} + +BooleanVal CollectionFunctions::ArrayContainsString( + FunctionContext* ctx, const CollectionVal& arr, const StringVal& item) { + if (arr.is_null || item.is_null) return BooleanVal::null(); + ArrayContainsState* state = GetOrCreateArrayState(ctx); + if (state == nullptr) return BooleanVal::null(); + DCHECK_GT(state->tuple_byte_size, 0); + uint8_t* tuple_ptr = arr.ptr; + for (int i = 0; i < arr.num_tuples; ++i, tuple_ptr += state->tuple_byte_size) { + Tuple* tuple = reinterpret_cast(tuple_ptr); + if (IsElementNull(state, tuple)) continue; + StringValue* current_value = + reinterpret_cast(tuple->GetSlot(state->slot_offset)); + if (current_value->IrLen() != item.len) continue; + if (current_value->IrLen() == 0) return BooleanVal(true); + if (current_value->IrPtr() != nullptr && item.ptr != nullptr && + memcmp(current_value->IrPtr(), reinterpret_cast(item.ptr), + current_value->IrLen()) == 0) { + return BooleanVal(true); + } + } + return BooleanVal(false); +} + +} // namespace impala diff --git a/be/src/exprs/collection-functions.h b/be/src/exprs/collection-functions.h new file mode 100644 index 0000000000..c5f8c62b2a --- /dev/null +++ b/be/src/exprs/collection-functions.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "udf/udf.h" +#include "udf/udf-internal.h" // for CollectionVal + +namespace impala { + +using impala_udf::FunctionContext; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::StringVal; +using impala_udf::CollectionVal; + +class CollectionFunctions { + public: + static void ArrayContainsPrepare(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope); + static void ArrayContainsClose(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope); + + static BooleanVal ArrayContainsBoolean( + FunctionContext* ctx, const CollectionVal& arr, const BooleanVal& item); + static BooleanVal ArrayContainsTinyInt( + FunctionContext* ctx, const CollectionVal& arr, const TinyIntVal& item); + static BooleanVal ArrayContainsSmallInt( + FunctionContext* ctx, const CollectionVal& arr, const SmallIntVal& item); + static BooleanVal ArrayContainsInt( + FunctionContext* ctx, const CollectionVal& arr, const IntVal& item); + static BooleanVal ArrayContainsBigInt( + FunctionContext* ctx, const CollectionVal& arr, const BigIntVal& item); + static BooleanVal ArrayContainsFloat( + FunctionContext* ctx, const CollectionVal& arr, const FloatVal& item); + static BooleanVal ArrayContainsDouble( + FunctionContext* ctx, const CollectionVal& arr, const DoubleVal& item); + + static BooleanVal ArrayContainsString( + FunctionContext* ctx, const CollectionVal& arr, const StringVal& item); +}; + +} // namespace impala diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc index e4d9cb3fdf..e6f75c115e 100644 --- a/be/src/exprs/scalar-expr-evaluator.cc +++ b/be/src/exprs/scalar-expr-evaluator.cc @@ -27,6 +27,7 @@ #include "exprs/bit-byte-functions.h" #include "exprs/case-expr.h" #include "exprs/cast-functions.h" +#include "exprs/collection-functions.h" #include "exprs/compound-predicates.h" #include "exprs/conditional-functions.h" #include "exprs/datasketches-functions.h" @@ -454,6 +455,7 @@ void ScalarExprEvaluator::InitBuiltinsDummy() { AggregateFunctions::InitNull(nullptr, nullptr); BitByteFunctions::CountSet(nullptr, TinyIntVal::null()); CastFunctions::CastToBooleanVal(nullptr, TinyIntVal::null()); + CollectionFunctions::ArrayContainsInt(nullptr, CollectionVal::null(), IntVal::null()); CompoundPredicate::Not(nullptr, BooleanVal::null()); ConditionalFunctions::NullIfZero(nullptr, TinyIntVal::null()); DataSketchesFunctions::DsHllEstimate(nullptr, StringVal::null()); diff --git a/be/src/runtime/collection-value.h b/be/src/runtime/collection-value.h index fbfdbc2822..076a56eb86 100644 --- a/be/src/runtime/collection-value.h +++ b/be/src/runtime/collection-value.h @@ -52,6 +52,11 @@ struct __attribute__((__packed__)) CollectionValue { return static_cast(num_tuples) * item_tuple_desc.byte_size(); } + /// Returns a CollectionVal representation in the output variable. + impala_udf::CollectionVal ToCollectionVal() const { + return impala_udf::CollectionVal(ptr, num_tuples); + } + /// For C++/IR interop, we need to be able to look up types by name. static const char* LLVM_CLASS_NAME; }; diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index 578dfa819d..b4709a77d6 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -156,6 +156,29 @@ struct ColumnType { return ret; } + static ColumnType CreateArrayType(const ColumnType& element_type) { + ColumnType ret; + ret.type = TYPE_ARRAY; + ret.children.push_back(element_type); + return ret; + } + + static ColumnType CreateMapType(const ColumnType& key_type, + const ColumnType& value_type) { + ColumnType ret; + ret.type = TYPE_MAP; + ret.children.push_back(key_type); + ret.children.push_back(value_type); + return ret; + } + + static ColumnType CreateStructType(const std::vector& children) { + ColumnType ret; + ret.type = TYPE_STRUCT; + ret.children = children; + return ret; + } + // Matches the results of createAdjustedDecimalType in front-end code. static ColumnType CreateAdjustedDecimalType(int precision, int scale) { if (precision > MAX_PRECISION) { diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index a8a8675445..99eed0f306 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -25,6 +25,7 @@ #include #include #include +#include // Only use noexcept if the compiler supports C++11 (some system compilers may not // or may have it disabled by default). @@ -109,6 +110,8 @@ class FunctionContext { TYPE_VARCHAR, // A fixed-size buffer, passed as a StringVal. TYPE_FIXED_UDA_INTERMEDIATE, + TYPE_ARRAY, + TYPE_MAP, TYPE_STRUCT }; @@ -122,6 +125,12 @@ class FunctionContext { /// Only valid if type is one of TYPE_FIXED_BUFFER, TYPE_FIXED_UDA_INTERMEDIATE or /// TYPE_VARCHAR. int len; + + /// Only valid if type is one of TYPE_ARRAY, TYPE_MAP, or TYPE_STRUCT. + /// For TYPE_ARRAY: children[0] is the element type. + /// For TYPE_MAP: children[0] is the key type, children[1] is the value type. + /// For TYPE_STRUCT: children contains all field types. + std::vector children; }; struct UniqueId { diff --git a/be/src/util/symbols-util.cc b/be/src/util/symbols-util.cc index c2dac62664..d3dca20350 100644 --- a/be/src/util/symbols-util.cc +++ b/be/src/util/symbols-util.cc @@ -137,6 +137,7 @@ static void AppendAnyValType(int namespace_id, const ColumnType& type, stringstr CASE_TYPE_APPEND_MANGLED_TOKEN(TYPE_CHAR, StringVal) CASE_TYPE_APPEND_MANGLED_TOKEN(TYPE_TIMESTAMP, TimestampVal) CASE_TYPE_APPEND_MANGLED_TOKEN(TYPE_DECIMAL, DecimalVal) + CASE_TYPE_APPEND_MANGLED_TOKEN(TYPE_ARRAY, CollectionVal) default: DCHECK(false) << "NYI: " << type.DebugString(); diff --git a/bin/run-workload.py b/bin/run-workload.py index 78d118ee28..b99b2d41a9 100755 --- a/bin/run-workload.py +++ b/bin/run-workload.py @@ -145,6 +145,11 @@ def default(self, obj,): if isinstance(obj, datetime): # Convert datetime into an standard iso string return obj.isoformat() + if isinstance(obj, bytes): + # Impyla can leave a string value as bytes when it is unable to decode it to UTF-8. + # TPC-DS has queries that produce non-UTF-8 results (e.g. Q30 on scale 20) + # Convert bytes to strings to make JSON encoding work + return obj.decode(encoding="utf-8", errors="backslashreplace") elif isinstance(obj, (Query, HiveQueryResult, QueryExecConfig, TableFormatInfo)): # Serialize these objects manually by returning their __dict__ methods. return obj.__dict__ diff --git a/bin/single_node_perf_run.py b/bin/single_node_perf_run.py index d141f8694c..c54c71a494 100755 --- a/bin/single_node_perf_run.py +++ b/bin/single_node_perf_run.py @@ -79,7 +79,6 @@ import json import os import pipes -import sh import shutil import subprocess import sys @@ -99,6 +98,16 @@ def configured_call(cmd): return subprocess.check_call(["bash", "-c", cmd]) +def run_git(args): + """Runs git without capturing output (stdout passes through to stdout)""" + subprocess.check_call(["git"] + args, text=True) + + +def get_git_output(args): + """Runs git, capturing the output and returning it""" + return subprocess.check_output(["git"] + args, text=True) + + def load_data(db_to_load, table_formats, scale): """Loads a database with a particular scale factor.""" all_formats = ("text/none," + table_formats if "text/none" not in table_formats @@ -115,12 +124,12 @@ def load_data(db_to_load, table_formats, scale): def get_git_hash_for_name(name): - return sh.git("rev-parse", name).strip() + return get_git_output(["rev-parse", name]).strip() def build(git_hash, options): """Builds Impala in release mode; doesn't build tests.""" - sh.git.checkout(git_hash) + run_git(["checkout", git_hash]) buildall = ["{0}/buildall.sh".format(IMPALA_HOME), "-notests", "-release", "-noclean"] if options.ninja: buildall += ["-ninja"] @@ -168,15 +177,20 @@ def run_workload(base_dir, workloads, options): def report_benchmark_results(file_a, file_b, description): """Wrapper around report_benchmark_result.py.""" + performance_result = subprocess.check_output( + ["{0}/tests/benchmark/report_benchmark_results.py".format(IMPALA_HOME), + "--reference_result_file={0}".format(file_a), + "--input_result_file={0}".format(file_b), + '--report_description="{0}"'.format(description)], + text=True) + + # Output the performance result to stdout for convenience + print(performance_result) + + # Dump the performance result to a file to preserve result = os.path.join(IMPALA_PERF_RESULTS, "latest", "performance_result.txt") with open(result, "w") as f: - subprocess.check_call( - ["{0}/tests/benchmark/report_benchmark_results.py".format(IMPALA_HOME), - "--reference_result_file={0}".format(file_a), - "--input_result_file={0}".format(file_b), - '--report_description="{0}"'.format(description)], - stdout=f) - sh.cat(result, _out=sys.stdout) + f.write(performance_result) def compare(base_dir, hash_a, hash_b, options): @@ -190,19 +204,17 @@ def compare(base_dir, hash_a, hash_b, options): if options.split_profiles: generate_profile_files(file_a, hash_a, base_dir) generate_profile_files(file_b, hash_b, base_dir) - sh.diff("-u", - os.path.join(base_dir, hash_a + "_profiles"), - os.path.join(base_dir, hash_b + "_profiles"), - _out=os.path.join(IMPALA_HOME, "performance_result_profile_diff.txt"), - _ok_code=[0, 1]) + with open(os.path.join(IMPALA_HOME, "performance_result_profile_diff.txt"), "w") as f: + # This does not check that the diff command succeeds + subprocess.run(["diff", "-u", os.path.join(base_dir, hash_a + "_profiles"), + os.path.join(base_dir, hash_b + "_profiles")], stdout=f, text=True) else: generate_profile_file(file_a, hash_a, base_dir) generate_profile_file(file_b, hash_b, base_dir) - sh.diff("-u", - os.path.join(base_dir, hash_a + "_profile.txt"), - os.path.join(base_dir, hash_b + "_profile.txt"), - _out=os.path.join(IMPALA_HOME, "performance_result_profile_diff.txt"), - _ok_code=[0, 1]) + with open(os.path.join(IMPALA_HOME, "performance_result_profile_diff.txt"), "w") as f: + # This does not check that the diff command succeeds + subprocess.run(["diff", "-u", os.path.join(base_dir, hash_a + "_profile.txt"), + os.path.join(base_dir, hash_b + "_profile.txt")], stdout=f, text=True) def generate_profile_file(name, hash, base_dir): @@ -253,16 +265,17 @@ def backup_workloads(): Used to keep workloads from being clobbered by git checkout. """ temp_dir = mkdtemp() - sh.cp(os.path.join(IMPALA_HOME, "testdata", "workloads"), - temp_dir, R=True, _out=sys.stdout, _err=sys.stderr) + shutil.copytree(os.path.join(IMPALA_HOME, "testdata", "workloads"), + os.path.join(temp_dir, "workloads")) print("Backed up workloads to {0}".format(temp_dir)) return temp_dir def restore_workloads(source): """Restores the workload directory from source into the Impala tree.""" - sh.cp(os.path.join(source, "workloads"), os.path.join(IMPALA_HOME, "testdata"), - R=True, _out=sys.stdout, _err=sys.stderr) + # dirs_exist_ok=True allows this to overwrite the existing files + shutil.copytree(os.path.join(source, "workloads"), + os.path.join(IMPALA_HOME, "testdata", "workloads"), dirs_exist_ok=True) def perf_ab_test(options, args): @@ -314,7 +327,7 @@ def perf_ab_test(options, args): hash_b = get_git_hash_for_name(args[1]) # discard any changes created by the previous restore_workloads() shutil.rmtree("testdata/workloads") - sh.git.checkout("--", "testdata/workloads") + run_git(["checkout", "--", "testdata/workloads"]) build(hash_b, options) restore_workloads(workload_dir) start_impala(options.num_impalads, options) @@ -399,17 +412,17 @@ def main(): os.chdir(IMPALA_HOME) - if sh.git("status", "--porcelain", "--untracked-files=no", _out=None).strip(): - sh.git("status", "--porcelain", "--untracked-files=no", _out=sys.stdout) + if get_git_output(["status", "--porcelain", "--untracked-files=no"]).strip(): + run_git(["status", "--porcelain", "--untracked-files=no"]) # Something went wrong, let's dump the actual diff to make it easier to # track down print("#### Working copy is dirty, dumping the diff #####") - sh.git("--no-pager", "diff", _out=sys.stdout) + run_git(["--no-pager", "diff"]) print("#### End of diff #####") raise Exception("Working copy is dirty. Consider 'git stash' and try again.") # Save the current hash to be able to return to this place in the tree when done - current_hash = sh.git("rev-parse", "--abbrev-ref", "HEAD").strip() + current_hash = get_git_output(["rev-parse", "--abbrev-ref", "HEAD"]).strip() if current_hash == "HEAD": current_hash = get_git_hash_for_name("HEAD") @@ -419,8 +432,8 @@ def main(): finally: # discard any changes created by the previous restore_workloads() shutil.rmtree("testdata/workloads") - sh.git.checkout("--", "testdata/workloads") - sh.git.checkout(current_hash) + run_git(["checkout", "--", "testdata/workloads"]) + run_git(["checkout", current_hash]) restore_workloads(workloads) diff --git a/common/function-registry/gen_builtins_catalog.py b/common/function-registry/gen_builtins_catalog.py index ed2b0b48cf..7f93c1ba28 100755 --- a/common/function-registry/gen_builtins_catalog.py +++ b/common/function-registry/gen_builtins_catalog.py @@ -45,6 +45,7 @@ \n\ package org.apache.impala.builtins;\n\ \n\ +import org.apache.impala.catalog.ArrayType;\n\ import org.apache.impala.catalog.Type;\n\ import org.apache.impala.catalog.Db;\n\ \n\ @@ -82,6 +83,20 @@ def add_function(fn_meta_data, user_visible): meta_data_entries.append(entry) +def convert_type_to_java(type_str): + """Convert a type string to Java Type expression. + + Handles complex types like ARRAY by converting them to + new ArrayType(Type.INT). + """ + if type_str.startswith("ARRAY<") and type_str.endswith(">"): + # Extract the element type from ARRAY + element_type = type_str[6:-1] # Remove "ARRAY<" and ">" + return "new ArrayType(Type.%s)" % element_type + else: + return "Type." + type_str + + def generate_fe_entry(entry, name): java_output = "" java_output += "\"" + name + "\"" @@ -105,9 +120,9 @@ def generate_fe_entry(entry, name): else: java_output += ", false" - java_output += ", Type." + entry["ret_type"] + java_output += ", " + convert_type_to_java(entry["ret_type"]) for arg in entry["args"]: - java_output += ", Type." + arg + java_output += ", " + convert_type_to_java(arg) return java_output diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py index eb7a53197f..ac341a7047 100644 --- a/common/function-registry/impala_functions.py +++ b/common/function-registry/impala_functions.py @@ -728,6 +728,40 @@ def symbol(class_name, fn_name, templated_type = None): [['isfalse'], 'BOOLEAN', ['BOOLEAN'], 'impala::ConditionalFunctions::IsFalse'], [['isnotfalse'], 'BOOLEAN', ['BOOLEAN'], 'impala::ConditionalFunctions::IsNotFalse'], + # Collection/Array functions +[['array_contains'], 'BOOLEAN', ['ARRAY', 'TINYINT'], + 'impala::CollectionFunctions::ArrayContainsTinyInt', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'SMALLINT'], + 'impala::CollectionFunctions::ArrayContainsSmallInt', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'INT'], + 'impala::CollectionFunctions::ArrayContainsInt', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'BIGINT'], + 'impala::CollectionFunctions::ArrayContainsBigInt', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'FLOAT'], + 'impala::CollectionFunctions::ArrayContainsFloat', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'DOUBLE'], + 'impala::CollectionFunctions::ArrayContainsDouble', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'STRING'], + 'impala::CollectionFunctions::ArrayContainsString', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], +[['array_contains'], 'BOOLEAN', ['ARRAY', 'BOOLEAN'], + 'impala::CollectionFunctions::ArrayContainsBoolean', + 'impala::CollectionFunctions::ArrayContainsPrepare', + 'impala::CollectionFunctions::ArrayContainsClose'], + # Utility functions [['uuid'], 'STRING', [], '_ZN6impala16UtilityFunctions4UuidEPN10impala_udf15FunctionContextE', diff --git a/docs/impala.ditamap b/docs/impala.ditamap index 466bb8f86d..8a0f962925 100644 --- a/docs/impala.ditamap +++ b/docs/impala.ditamap @@ -265,6 +265,8 @@ under the License. + + diff --git a/docs/topics/impala_sync_hms_events_strict_mode.xml b/docs/topics/impala_sync_hms_events_strict_mode.xml new file mode 100644 index 0000000000..45edb6da0b --- /dev/null +++ b/docs/topics/impala_sync_hms_events_strict_mode.xml @@ -0,0 +1,73 @@ + + + + + SYNC_HMS_EVENTS_STRICT_MODE Query Option + SYNC HMS EVENTS STRICT MODE + + + + + + + + + + + +

SYNC_HMS_EVENTS_STRICT_MODE query + optionThis query option controls the behavior of the query coordinator if it + cannot successfully sync with the latest HMS events (e.g., if the waiting time set by + SYNC_HMS_EVENTS_WAIT_TIME_S is reached, or the event processor is in an + error state).

+

+

+

Query Range: + True or False

+

+

This option determines whether Impala favors consistency (failing the query) or availability + (starting the query planning despite potential lag).

    +
  • True (Strict Mode): If Catalog service cannot sync the metadata with + the latest HMS event ID before the timeout is reached, the coordinator fails the query + with an error. This prioritizes correctness and metadata consistency.

    If the coordinator + fails the query when SYNC_HMS_EVENTS_STRICT_MODE is set to + TRUE (Strict Mode), you will see an error message starting with + Failed to sync events from Metastore: and then the specific reason, + such as HMS event processing is disabled.

  • +
  • False (Non-Strict Mode - Default): If the waiting times out, the + coordinator starts planning the query immediately but issues a warning message in the + query profile. This prioritizes availability, allowing the query to run, but risks working + with slightly stale metadata.
  • +

+ This option is only relevant if + SYNC_HMS_EVENTS_WAIT_TIME_S is set to a value greater than 0. +

Example

+

The following example shows how to set SYNC_HMS_EVENTS_STRICT_MODE to + TRUE for a specific query to enforce + consistency:SET SYNC_HMS_EVENTS_STRICT_MODE=TRUE; +SELECT COUNT(*) FROM functional.alltypes;

+

Added in: +

+

+

+ +

+
+
diff --git a/docs/topics/impala_sync_hms_events_wait_time_s.xml b/docs/topics/impala_sync_hms_events_wait_time_s.xml new file mode 100644 index 0000000000..3a30d3e8ea --- /dev/null +++ b/docs/topics/impala_sync_hms_events_wait_time_s.xml @@ -0,0 +1,79 @@ + + + + + SYNC_HMS_EVENTS_WAIT_TIME_S Query Option + SYNC HMS EVENTS WAIT TIME S + + + + + + + + + + + +

SYNC_HMS_EVENTS_WAIT_TIME_S query optionThis query + option controls the maximum time Impala will wait for the Catalog Service to sync with the + latest events from the Hive Metastore (HMS) before starting query planning.

+

+

+

Query Range: > = 0

+

+

Setting this option to a positive value (in seconds) enables a new mechanism where only the + planning thread of that query will pause and wait for the Catalog service's event processor to + fully apply any outstanding metadata changes from the HMS before the query is analyzed and + planned.This mechanism only guarantees that HMS modifications + that occurred before query planning started will be synced by the Catalog service. Any HMS + modifications that happen after the query begins its planning phase are not guaranteed to be + applied.

    +
  • This is typically used after an external process (like Hive or Spark) has modified a + dependent table, ensuring Impala's query sees the most current metadata, such as newly + added partitions.
  • +
  • The default value of 0 disables this waiting mechanism.
  • +
  • The wait time could be set based on the maximum observed event processing lag in your + cluster (visible via the Catalogd WebUI /events page).
  • +

+

Example:

+

When a Hive table is updated by an INSERT operation on dynamic partitions, + you can use the sync_hms_events_wait_time_s query option in Impala to ensure + the metadata is synchronized before you query the table.

+

Impala waits up to the specified time (sync_hms_events_wait_time_s) for Hive + Metastore Service (HMS) events to synchronize automatically.

+

The following example demonstrates setting the option to safely query a Hive table immediately after it is updated. In this case, Impala waits up to 300 seconds for the synchronization to complete.

+ hive> insert into tbl partition(p) select * from tbl2; +impala> set sync_hms_events_wait_time_s=300; +impala> select * from tbl; +

+ You do not need to run a REFRESH + command or wait explicitly on the client side. +

+

+ Added in: + +

+

+

+ +

+
+
diff --git a/fe/src/main/java/org/apache/impala/catalog/ArrayType.java b/fe/src/main/java/org/apache/impala/catalog/ArrayType.java index 6e81e267d9..992d3a5845 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ArrayType.java +++ b/fe/src/main/java/org/apache/impala/catalog/ArrayType.java @@ -55,6 +55,19 @@ public int hashCode() { return 1 + itemType_.hashCode(); } + @Override + public String toString() { + return toSql(); + } + + @Override + public boolean matchesType(Type t) { + if (equals(t)) return true; + if (!t.isArrayType()) return false; + ArrayType otherArrayType = (ArrayType) t; + return itemType_.matchesType(otherArrayType.itemType_); + } + @Override public void toThrift(TColumnType container) { TTypeNode node = new TTypeNode(); diff --git a/fe/src/main/java/org/apache/impala/catalog/MapType.java b/fe/src/main/java/org/apache/impala/catalog/MapType.java index 41e60ec1a3..23be48c12e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/MapType.java +++ b/fe/src/main/java/org/apache/impala/catalog/MapType.java @@ -56,6 +56,20 @@ public int hashCode() { return Objects.hash(keyType_, valueType_); } + @Override + public String toString() { + return toSql(); + } + + @Override + public boolean matchesType(Type t) { + if (equals(t)) return true; + if (!t.isMapType()) return false; + MapType otherMapType = (MapType) t; + return keyType_.matchesType(otherMapType.keyType_) && + valueType_.matchesType(otherMapType.valueType_); + } + @Override public String toSql(int depth) { if (depth >= MAX_NESTING_DEPTH) return "MAP<...>"; diff --git a/fe/src/main/java/org/apache/impala/catalog/StructType.java b/fe/src/main/java/org/apache/impala/catalog/StructType.java index c6c043f401..27fdedce24 100644 --- a/fe/src/main/java/org/apache/impala/catalog/StructType.java +++ b/fe/src/main/java/org/apache/impala/catalog/StructType.java @@ -121,6 +121,26 @@ public int hashCode() { return fields_.hashCode(); } + @Override + public String toString() { + return toSql(); + } + + @Override + public boolean matchesType(Type t) { + if (equals(t)) return true; + if (!t.isStructType()) return false; + StructType otherStructType = (StructType) t; + if (fields_.size() != otherStructType.fields_.size()) return false; + for (int i = 0; i < fields_.size(); ++i) { + if (!fields_.get(i).getType().matchesType( + otherStructType.fields_.get(i).getType())) { + return false; + } + } + return true; + } + @Override public void toThrift(TColumnType container) { TTypeNode node = new TTypeNode(); diff --git a/fe/src/main/java/org/apache/impala/util/FunctionUtils.java b/fe/src/main/java/org/apache/impala/util/FunctionUtils.java index 28883685de..dfc8a4825d 100644 --- a/fe/src/main/java/org/apache/impala/util/FunctionUtils.java +++ b/fe/src/main/java/org/apache/impala/util/FunctionUtils.java @@ -28,9 +28,11 @@ import java.util.UUID; import org.apache.hadoop.fs.Path; +import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.Db; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.Function.CompareMode; +import org.apache.impala.catalog.MapType; import org.apache.impala.catalog.ScalarFunction; import org.apache.impala.catalog.Type; import org.apache.impala.common.FileSystemUtil; @@ -156,10 +158,18 @@ public int compare(Function f1, Function f2) { } private int typeCompare(Type t1, Type t2) { - Preconditions.checkState(!t1.isComplexType()); - Preconditions.checkState(!t2.isComplexType()); - return Integer.compare(t1.getPrimitiveType().ordinal(), - t2.getPrimitiveType().ordinal()); + if (t1.isComplexType() && t2.isComplexType()) { + // For complex types, compare their SQL representations + // (comparing individual fields would be more complex and is rarely needed) + return t1.toSql().compareTo(t2.toSql()); + } + if (t1.isScalarType() && t2.isScalarType()) { + // For primitive types, use the original comparison + return Integer.compare(t1.getPrimitiveType().ordinal(), + t2.getPrimitiveType().ordinal()); + } + // Complex types come after primitive types + return t1.isComplexType() ? 1 : -1; } } } diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt index ae27ff1696..ad4f43e11f 100644 --- a/infra/python/deps/requirements.txt +++ b/infra/python/deps/requirements.txt @@ -54,7 +54,6 @@ requests == 2.21.0 urllib3 == 1.24.2 certifi == 2020.12.5 sasl == 0.2.1 -sh == 1.11 six == 1.14.0 sqlparse == 0.3.1 texttable == 0.8.3 diff --git a/testdata/workloads/functional-query/queries/QueryTest/array-contains.test b/testdata/workloads/functional-query/queries/QueryTest/array-contains.test new file mode 100644 index 0000000000..9857d29a66 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/array-contains.test @@ -0,0 +1,225 @@ +==== +---- QUERY +# Test array_contains with int array - element exists +select id, array_contains(arr1, 10) from complextypes_arrays where id=5 +---- RESULTS +5,true +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with int array - element does not exist +select id, array_contains(arr1, 99) from complextypes_arrays where id=5 +---- RESULTS +5,false +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with string array - element exists +select id, array_contains(arr2, 'ten') from complextypes_arrays where id=5 +---- RESULTS +5,true +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with string array - element does not exist +select id, array_contains(arr2, 'notfound') from complextypes_arrays where id=5 +---- RESULTS +5,false +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with int array - all rows +select id, array_contains(arr1, 10) from complextypes_arrays order by id +---- RESULTS +1,false +2,false +3,true +4,true +5,true +6,false +7,false +8,false +9,NULL +10,false +11,NULL +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with string array - all rows +select id, array_contains(arr2, 'ten') from complextypes_arrays order by id +---- RESULTS +1,false +2,false +3,true +4,true +5,true +6,false +7,false +8,false +9,false +10,NULL +11,NULL +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains searching for NULL in int array +select id, array_contains(arr1, cast(NULL as int)) from complextypes_arrays where id=5 +---- RESULTS +5,NULL +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains in WHERE clause with int array +select id from complextypes_arrays where array_contains(arr1, 10) order by id +---- RESULTS +3 +4 +5 +---- TYPES +int +==== +---- QUERY +# Test array_contains in WHERE clause with string array +select id from complextypes_arrays where array_contains(arr2, 'ten') order by id +---- RESULTS +3 +4 +5 +---- TYPES +int +==== +---- QUERY +# Test array_contains with multiple conditions - int and string arrays +select id, array_contains(arr1, 10), array_contains(arr2, 'ten') +from complextypes_arrays where id <= 3 order by id +---- RESULTS +1,false,false +2,false,false +3,true,true +---- TYPES +int,boolean,boolean +==== +---- QUERY +# Test array_contains with JOIN +select t1.id, array_contains(t1.arr1, 10), array_contains(t2.arr2, 'ten') +from complextypes_arrays t1 join complextypes_arrays t2 on t1.id = t2.id +where t1.id = 5 +---- RESULTS +5,true,true +---- TYPES +int,boolean,boolean +==== +---- QUERY +# Test array_contains with UNION ALL +select id, array_contains(arr1, 10) from complextypes_arrays where id=1 +union all +select id, array_contains(arr2, 'ten') from complextypes_arrays where id=3 +---- RESULTS +1,false +3,true +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with aggregation +select count(*) from complextypes_arrays where array_contains(arr1, 10) +---- RESULTS +3 +---- TYPES +bigint +==== +---- QUERY +# Test array_contains with GROUP BY +select array_contains(arr1, 10) as contains_ten, count(*) +from complextypes_arrays +group by array_contains(arr1, 10) order by contains_ten +---- RESULTS +false,6 +true,3 +NULL,2 +---- TYPES +boolean,bigint +==== +---- QUERY +# Test array_contains with ORDER BY +select id, array_contains(arr1, 10) as contains_ten +from complextypes_arrays +where id <= 5 +order by contains_ten, id +---- RESULTS +1,false +2,false +3,true +4,true +5,true +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with subquery +select id from complextypes_arrays t +where array_contains(arr1, 10) = true order by id +---- RESULTS +3 +4 +5 +---- TYPES +int +==== +---- QUERY +# Test array_contains with CASE expression - testing both int and string arrays +select id, + case when array_contains(arr1, 10) then 'contains 10 in arr1' + when array_contains(arr2, 'ten') then 'contains ten in arr2' + else 'neither' + end as result +from complextypes_arrays where id <= 4 order by id +---- RESULTS +1,'neither' +2,'neither' +3,'contains 10 in arr1' +4,'contains 10 in arr1' +---- TYPES +int,string +==== +---- QUERY +# Test array_contains with double array - element exists +select i, array_contains(arr, cast(10.0 as double)) from functional_parquet.iceberg_metadata_alltypes where i=1 +---- RESULTS +1,true +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with double array - element does not exist +select i, array_contains(arr, cast(99.9 as double)) from functional_parquet.iceberg_metadata_alltypes where i=1 +---- RESULTS +1,false +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with double array - all rows +select i, array_contains(arr, -2e+100) from functional_parquet.iceberg_metadata_alltypes order by i +---- RESULTS +1,false +5,true +5,true +---- TYPES +int,boolean +==== +---- QUERY +# Test array_contains with double array - searching for NULL +select i, array_contains(arr, cast(NULL as double)) from functional_parquet.iceberg_metadata_alltypes where i=1 +---- RESULTS +1,NULL +---- TYPES +int,boolean +==== diff --git a/tests/query_test/test_array_contains.py b/tests/query_test/test_array_contains.py new file mode 100644 index 0000000000..6af1086a70 --- /dev/null +++ b/tests/query_test/test_array_contains.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_dimensions import ( + add_exec_option_dimension, + create_exec_option_dimension_from_dict, + create_client_protocol_dimension, + orc_schema_resolution_constraint) + +ORC_RESOLUTION_DIMS = [0, 1] + + +class TestArrayContains(ImpalaTestSuite): + """Functional tests for array_contains function.""" + @classmethod + def add_test_dimensions(cls): + super(TestArrayContains, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + create_exec_option_dimension_from_dict({ + 'disable_codegen': ['False', 'True'], + # The below two options are set to prevent the planner from disabling codegen + # because of the small data size even when 'disable_codegen' is False. + 'disable_codegen_rows_threshold': [0], + 'exec_single_node_rows_threshold': [0]})) + # Must declare 'orc_schema_resolution' using 'add_exec_option_dimension' so that + # 'orc_schema_resolution_constraint' can catch it. + add_exec_option_dimension(cls, 'orc_schema_resolution', ORC_RESOLUTION_DIMS) + cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format in ['parquet', 'orc']) + cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint) + + def test_array_contains(self, vector): + """Queries that test array_contains function""" + self.run_test_case('QueryTest/array-contains', vector)