diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 83eb522484ba0..fec7b59937dae 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -159,6 +159,7 @@ set(PARQUET_SRCS arrow/schema_internal.cc arrow/writer.cc bloom_filter.cc + bloom_filter_builder.cc bloom_filter_reader.cc column_reader.cc column_scanner.cc @@ -367,7 +368,7 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet_version.h" add_parquet_test(internals-test SOURCES bloom_filter_test.cc - bloom_filter_reader_test.cc + bloom_filter_reader_writer_test.cc properties_test.cc statistics_test.cc encoding_test.cc diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index f80ab83c8607f..7d0b2dca1c8a9 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -67,6 +67,8 @@ #include "parquet/arrow/schema.h" #include "parquet/arrow/test_util.h" #include "parquet/arrow/writer.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" #include "parquet/column_writer.h" #include "parquet/file_writer.h" #include "parquet/page_index.h" @@ -5670,7 +5672,7 @@ auto encode_double = [](double value) { } // namespace -class ParquetPageIndexRoundTripTest : public ::testing::Test { +class TestingWithPageIndex { public: void WriteFile(const std::shared_ptr& writer_properties, const std::shared_ptr<::arrow::Table>& table) { @@ -5694,6 +5696,13 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test { ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); } + protected: + std::shared_ptr buffer_; +}; + +class ParquetPageIndexRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, const std::set& expect_columns_without_index = {}) { auto read_properties = default_arrow_reader_properties(); @@ -5762,7 +5771,6 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test { } protected: - std::shared_ptr buffer_; std::vector column_indexes_; }; @@ -5998,5 +6006,244 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { /*null_counts=*/{0}})); } +class ParquetBloomFilterRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadBloomFilters(int expect_num_row_groups, + const std::set& expect_columns_without_filter = {}) { + auto reader = ParquetFileReader::Open(std::make_shared(buffer_)); + + auto metadata = reader->metadata(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_reader = bloom_filter_reader.RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + bool expect_no_bloom_filter = expect_columns_without_filter.find(col) != + expect_columns_without_filter.cend(); + + auto bloom_filter = row_group_reader->GetColumnBloomFilter(col); + if (expect_no_bloom_filter) { + ASSERT_EQ(nullptr, bloom_filter); + } else { + ASSERT_NE(nullptr, bloom_filter); + bloom_filters_.push_back(std::move(bloom_filter)); + } + } + } + } + + template + void VerifyBloomFilterContains(const BloomFilter* bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(value.value()))); + } + } + + template + void VerifyBloomFilterNotContains(const BloomFilter* bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(value.value()))); + } + } + + protected: + std::vector> bloom_filters_; +}; + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + BloomFilterOptions options; + options.ndv = 10; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter_options(options, "c0") + ->enable_bloom_filter_options(options, "c1") + ->max_row_group_length(4) + ->build(); + auto table = ::arrow::TableFromJSON(schema, {R"([ + [1, "a"], + [2, "b"], + [3, "c"], + [null, "d"], + [5, null], + [6, "f"] + ])"}); + WriteFile(writer_properties, table); + + ReadBloomFilters(/*expect_num_row_groups=*/2); + ASSERT_EQ(4, bloom_filters_.size()); + std::vector row_group_row_count{4, 2}; + int64_t current_row = 0; + int64_t bloom_filter_idx = 0; // current index in `bloom_filters_` + for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) { + { + // The bloom filter for same column in another row-group. + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; + ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); + auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]); + VerifyBloomFilterContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); + ++bloom_filter_idx; + } + { + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; + ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); + auto col = table->column(1)->Slice(current_row, row_group_row_count[row_group_id]); + VerifyBloomFilterContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); + ++bloom_filter_idx; + } + current_row += row_group_row_count[row_group_id]; + } +} + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) { + auto origin_schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::dictionary(::arrow::int64(), ::arrow::int64())), + ::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), ::arrow::utf8()))}); + bloom_filters_.clear(); + BloomFilterOptions options; + options.ndv = 10; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter_options(options, "c0") + ->enable_bloom_filter_options(options, "c1") + ->max_row_group_length(4) + ->build(); + std::vector contents = {R"([ + [1, "a"], + [2, "b"], + [3, "c"], + [null, "d"], + [5, null], + [6, "f"] + ])"}; + auto dict_encoded_table = ::arrow::TableFromJSON(schema, contents); + // using non_dict_table to adapt some interface which doesn't support dictionary. + auto table = ::arrow::TableFromJSON(origin_schema, contents); + WriteFile(writer_properties, dict_encoded_table); + + ReadBloomFilters(/*expect_num_row_groups=*/2); + ASSERT_EQ(4, bloom_filters_.size()); + std::vector row_group_row_count{4, 2}; + int64_t current_row = 0; + int64_t bloom_filter_idx = 0; // current index in `bloom_filters_` + for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) { + { + // The bloom filter for same column in another row-group. + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; + ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); + auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]); + VerifyBloomFilterContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); + ++bloom_filter_idx; + } + { + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; + ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); + auto col = table->column(1)->Slice(current_row, row_group_row_count[row_group_id]); + VerifyBloomFilterContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); + ++bloom_filter_idx; + } + current_row += row_group_row_count[row_group_id]; + } +} + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + BloomFilterOptions options; + options.ndv = 10; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter_options(options, "c0") + ->disable_bloom_filter("c1") + ->max_row_group_length(4) + ->build(); + auto table = ::arrow::TableFromJSON(schema, {R"([ + [1, "a"], + [2, "b"], + [3, "c"], + [null, "d"], + [5, null], + [6, "f"] + ])"}); + WriteFile(writer_properties, table); + + ReadBloomFilters(/*expect_num_row_groups=*/2, /*expect_columns_without_filter=*/{1}); + ASSERT_EQ(2, bloom_filters_.size()); + std::vector row_group_row_count{4, 2}; + int64_t current_row = 0; + int64_t bloom_filter_idx = 0; // current index in `bloom_filters_` + for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) { + { + ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); + auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]); + VerifyBloomFilterContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx].get(), *col); + ++bloom_filter_idx; + } + current_row += row_group_row_count[row_group_id]; + } +} + +TEST_F(ParquetBloomFilterRoundTripTest, ThrowForBoolean) { + auto schema = ::arrow::schema({::arrow::field("boolean_col", ::arrow::boolean())}); + BloomFilterOptions options; + options.ndv = 10; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter_options(options, "boolean_col") + ->max_row_group_length(4) + ->build(); + auto table = ::arrow::TableFromJSON(schema, {R"([ + [true], + [null], + [false] + ])"}); + std::shared_ptr parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + auto s = arrow_writer->WriteTable(*table); + EXPECT_TRUE(s.IsIOError()); + EXPECT_THAT(s.message(), + ::testing::HasSubstr("BloomFilterBuilder does not support boolean type")); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index 82172f363ba7e..804940f294d61 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -106,6 +106,34 @@ class PARQUET_EXPORT BloomFilter { /// @return hash result. virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0; + // Variant of const reference argument to facilitate template + + /// Compute hash for ByteArray value by using its plain encoding result. + /// + /// @param value the value to hash. + uint64_t Hash(const ByteArray& value) const { return Hash(&value); } + + /// Compute hash for fixed byte array value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @param type_len the value length. + uint64_t Hash(const FLBA& value, uint32_t type_len) const { + return Hash(&value, type_len); + } + + /// Compute hash for Int96 value by using its plain encoding result. + /// + /// @param value the value to hash. + uint64_t Hash(const Int96& value) const { return Hash(&value); } + + /// Compute hash for std::string_view value by using its plain encoding result. + /// + /// @param value the value to hash. + uint64_t Hash(std::string_view value) const { + ByteArray ba(value); + return Hash(&ba); + } + /// Batch compute hashes for 32 bits values by using its plain encoding result. /// /// @param values values a pointer to the values to hash. diff --git a/cpp/src/parquet/bloom_filter_builder.cc b/cpp/src/parquet/bloom_filter_builder.cc new file mode 100644 index 0000000000000..ba8814b2540b7 --- /dev/null +++ b/cpp/src/parquet/bloom_filter_builder.cc @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#include "parquet/bloom_filter_builder.h" + +#include +#include +#include + +#include "arrow/io/interfaces.h" + +#include "parquet/bloom_filter.h" +#include "parquet/exception.h" +#include "parquet/metadata.h" +#include "parquet/properties.h" + +namespace parquet::internal { + +namespace { +/// Column encryption for bloom filter is not implemented yet. +class BloomFilterBuilderImpl : public BloomFilterBuilder { + public: + explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema, + const WriterProperties* properties) + : schema_(schema), properties_(properties) {} + BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete; + BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default; + + /// Append a new row group to host all incoming bloom filters. + void AppendRowGroup() override; + + BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override; + + /// Serialize all bloom filters with header and bitset in the order of row group and + /// column id. The side effect is that it deletes all bloom filters after they have + /// been flushed. + void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) override; + + private: + /// Make sure column ordinal is not out of bound and the builder is in good state. + void CheckState(int32_t column_ordinal) const { + if (finished_) { + throw ParquetException("BloomFilterBuilder is already finished."); + } + if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) { + throw ParquetException("Invalid column ordinal: ", column_ordinal); + } + if (file_bloom_filters_.empty()) { + throw ParquetException("No row group appended to BloomFilterBuilder."); + } + if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) { + throw ParquetException("BloomFilterBuilder does not support boolean type."); + } + } + + const SchemaDescriptor* schema_; + const WriterProperties* properties_; + bool finished_ = false; + + using RowGroupBloomFilters = std::map>; + // Using unique_ptr because the `std::unique_ptr` is not copyable. + // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036 + // So we use `std::unique_ptr>` to avoid the issue. + std::vector> file_bloom_filters_; +}; + +void BloomFilterBuilderImpl::AppendRowGroup() { + if (finished_) { + throw ParquetException("Cannot append to a finished BloomFilterBuilder"); + } + file_bloom_filters_.emplace_back(std::make_unique()); +} + +BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordinal) { + CheckState(column_ordinal); + const ColumnDescriptor* column_descr = schema_->Column(column_ordinal); + // Bloom filter does not support boolean type, and this should be checked in + // CheckState() already. + DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN); + auto bloom_filter_options_opt = properties_->bloom_filter_options(column_descr->path()); + if (bloom_filter_options_opt == std::nullopt) { + return nullptr; + } + const BloomFilterOptions& bloom_filter_options = *bloom_filter_options_opt; + // CheckState() should have checked that file_bloom_filters_ is not empty. + DCHECK(!file_bloom_filters_.empty()); + RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back(); + auto iter = row_group_bloom_filter.find(column_ordinal); + if (iter == row_group_bloom_filter.end()) { + auto block_split_bloom_filter = + std::make_unique(properties_->memory_pool()); + block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes( + bloom_filter_options.ndv, bloom_filter_options.fpp)); + auto insert_result = row_group_bloom_filter.emplace( + column_ordinal, std::move(block_split_bloom_filter)); + iter = insert_result.first; + } + if (iter->second == nullptr) { + throw ParquetException("Bloom filter should not be null for column ", + column_descr->path()); + } + return iter->second.get(); +} + +void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink, + BloomFilterLocation* location) { + if (finished_) { + throw ParquetException("Cannot write a finished BloomFilterBuilder"); + } + finished_ = true; + + for (size_t row_group_ordinal = 0; row_group_ordinal < file_bloom_filters_.size(); + ++row_group_ordinal) { + RowGroupBloomFilters& row_group_bloom_filters = + *file_bloom_filters_[row_group_ordinal]; + // the whole row group has no bloom filter + if (row_group_bloom_filters.empty()) { + continue; + } + int num_columns = schema_->num_columns(); + RowGroupBloomFilterLocation locations; + + // serialize bloom filter in ascending order of column id + for (auto& [column_id, filter] : row_group_bloom_filters) { + if (ARROW_PREDICT_FALSE(filter == nullptr)) { + throw ParquetException("Bloom filter is null for column ", column_id); + } + if (ARROW_PREDICT_FALSE(column_id < 0 || column_id >= num_columns)) { + throw ParquetException("Invalid column ordinal when serializing bloom filter: ", + column_id); + } + PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell()); + // TODO(GH-43138): Estimate the quality of the bloom filter before writing it. + filter->WriteTo(sink); + PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell()); + if (pos - offset > std::numeric_limits::max()) { + throw ParquetException("Bloom filter is too large to be serialized, size: ", + pos - offset, " for column ", column_id); + } + locations[column_id] = IndexLocation{offset, static_cast(pos - offset)}; + } + location->bloom_filter_location.emplace(row_group_ordinal, std::move(locations)); + } +} +} // namespace + +std::unique_ptr BloomFilterBuilder::Make( + const SchemaDescriptor* schema, const WriterProperties* properties) { + return std::make_unique(schema, properties); +} + +} // namespace parquet::internal diff --git a/cpp/src/parquet/bloom_filter_builder.h b/cpp/src/parquet/bloom_filter_builder.h new file mode 100644 index 0000000000000..90bc18a186def --- /dev/null +++ b/cpp/src/parquet/bloom_filter_builder.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/io/type_fwd.h" +#include "parquet/types.h" + +namespace parquet::internal { + +/// \brief Interface for collecting bloom filter of a parquet file. +/// +/// ``` +/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties); +/// for (int i = 0; i < num_row_groups; i++) { +/// bloom_filter_builder->AppendRowGroup(); +/// auto* bloom_filter = +/// bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column); +/// // Add bloom filter entries in `bloom_filter`. +/// // ... +/// } +/// bloom_filter_builder->WriteTo(sink, location); +/// ``` +class PARQUET_EXPORT BloomFilterBuilder { + public: + /// \brief API to create a BloomFilterBuilder. + static std::unique_ptr Make(const SchemaDescriptor* schema, + const WriterProperties* properties); + + /// Append a new row group to host all incoming bloom filters. + /// + /// This method must be called before `GetOrCreateBloomFilter` for a new row group. + /// + /// \throws ParquetException if WriteTo() has been called to flush bloom filters. + virtual void AppendRowGroup() = 0; + + /// \brief Get the BloomFilter from column ordinal. + /// + /// \param column_ordinal Column ordinal in schema, which is only for leaf columns. + /// + /// \return BloomFilter for the column and its memory ownership belongs to the + /// BloomFilterBuilder. It will return nullptr if bloom filter is not enabled for the + /// column. + /// + /// \throws ParquetException if any of following conditions applies: + /// 1) column_ordinal is out of bound. + /// 2) `WriteTo()` has been called already. + /// 3) `AppendRowGroup()` is not called before `GetOrCreateBloomFilter()`. + virtual BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) = 0; + + /// \brief Write the bloom filter to sink. + /// + /// The bloom filter cannot be modified after this method is called. + /// + /// \param[out] sink The output stream to write the bloom filter. + /// \param[out] location The location of all bloom filter relative to the start of sink. + /// + /// \throws ParquetException if WriteTo() has been called to flush bloom filters. + virtual void WriteTo(::arrow::io::OutputStream* sink, + BloomFilterLocation* location) = 0; + + virtual ~BloomFilterBuilder() = default; +}; + +} // namespace parquet::internal diff --git a/cpp/src/parquet/bloom_filter_reader_test.cc b/cpp/src/parquet/bloom_filter_reader_test.cc deleted file mode 100644 index f732b4a8e22b7..0000000000000 --- a/cpp/src/parquet/bloom_filter_reader_test.cc +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "parquet/bloom_filter.h" -#include "parquet/bloom_filter_reader.h" -#include "parquet/file_reader.h" -#include "parquet/test_util.h" - -namespace parquet::test { - -TEST(BloomFilterReader, ReadBloomFilter) { - std::vector files = {"data_index_bloom_encoding_stats.parquet", - "data_index_bloom_encoding_with_length.parquet"}; - for (const auto& test_file : files) { - std::string dir_string(parquet::test::get_data_dir()); - std::string path = dir_string + "/" + test_file; - auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false); - auto file_metadata = reader->metadata(); - EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); - auto& bloom_filter_reader = reader->GetBloomFilterReader(); - auto row_group_0 = bloom_filter_reader.RowGroup(0); - ASSERT_NE(nullptr, row_group_0); - EXPECT_THROW_THAT( - [&]() { bloom_filter_reader.RowGroup(1); }, ParquetException, - ::testing::Property(&ParquetException::what, - ::testing::HasSubstr("Invalid row group ordinal"))); - auto bloom_filter = row_group_0->GetColumnBloomFilter(0); - ASSERT_NE(nullptr, bloom_filter); - EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, ParquetException, - ::testing::Property(&ParquetException::what, - ::testing::HasSubstr( - "Invalid column index at column ordinal"))); - - // assert exists - { - std::string_view sv = "Hello"; - ByteArray ba{sv}; - EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); - } - - // no exists - { - std::string_view sv = "NOT_EXISTS"; - ByteArray ba{sv}; - EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); - } - } -} - -TEST(BloomFilterReader, FileNotHaveBloomFilter) { - // Can still get a BloomFilterReader and a RowGroupBloomFilter - // reader, but cannot get a non-null BloomFilter. - std::string dir_string(parquet::test::get_data_dir()); - std::string path = dir_string + "/alltypes_plain.parquet"; - auto reader = ParquetFileReader::OpenFile(path, false); - auto file_metadata = reader->metadata(); - EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); - auto& bloom_filter_reader = reader->GetBloomFilterReader(); - auto row_group_0 = bloom_filter_reader.RowGroup(0); - ASSERT_NE(nullptr, row_group_0); - EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException); - auto bloom_filter = row_group_0->GetColumnBloomFilter(0); - ASSERT_EQ(nullptr, bloom_filter); -} - -} // namespace parquet::test diff --git a/cpp/src/parquet/bloom_filter_reader_writer_test.cc b/cpp/src/parquet/bloom_filter_reader_writer_test.cc new file mode 100644 index 0000000000000..b3cfd9d474abb --- /dev/null +++ b/cpp/src/parquet/bloom_filter_reader_writer_test.cc @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/testing/gtest_util.h" + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_builder.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/file_reader.h" +#include "parquet/test_util.h" + +namespace parquet::test { + +TEST(BloomFilterReader, ReadBloomFilter) { + std::vector files = {"data_index_bloom_encoding_stats.parquet", + "data_index_bloom_encoding_with_length.parquet"}; + for (const auto& test_file : files) { + std::string dir_string(get_data_dir()); + std::string path = dir_string + "/" + test_file; + auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false); + auto file_metadata = reader->metadata(); + EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + auto row_group_0 = bloom_filter_reader.RowGroup(0); + ASSERT_NE(nullptr, row_group_0); + EXPECT_THROW_THAT( + [&]() { bloom_filter_reader.RowGroup(1); }, ParquetException, + ::testing::Property(&ParquetException::what, + ::testing::HasSubstr("Invalid row group ordinal"))); + auto bloom_filter = row_group_0->GetColumnBloomFilter(0); + ASSERT_NE(nullptr, bloom_filter); + EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, ParquetException, + ::testing::Property(&ParquetException::what, + ::testing::HasSubstr( + "Invalid column index at column ordinal"))); + + // assert exists + { + std::string_view sv = "Hello"; + ByteArray ba{sv}; + EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); + } + + // no exists + { + std::string_view sv = "NOT_EXISTS"; + ByteArray ba{sv}; + EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); + } + } +} + +TEST(BloomFilterReader, FileNotHaveBloomFilter) { + // Can still get a BloomFilterReader and a RowGroupBloomFilter + // reader, but cannot get a non-null BloomFilter. + std::string dir_string(get_data_dir()); + std::string path = dir_string + "/alltypes_plain.parquet"; + auto reader = ParquetFileReader::OpenFile(path, false); + auto file_metadata = reader->metadata(); + EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + auto row_group_0 = bloom_filter_reader.RowGroup(0); + ASSERT_NE(nullptr, row_group_0); + EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException); + auto bloom_filter = row_group_0->GetColumnBloomFilter(0); + ASSERT_EQ(nullptr, bloom_filter); +} + +// , c1 has bloom filter. +TEST(BloomFilterBuilder, BasicRoundTrip) { + SchemaDescriptor schema; + schema::NodePtr root = schema::GroupNode::Make( + "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")}); + schema.Init(root); + WriterProperties::Builder properties_builder; + BloomFilterOptions bloom_filter_options; + bloom_filter_options.ndv = 100; + properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1"); + auto writer_properties = properties_builder.build(); + auto builder = internal::BloomFilterBuilder::Make(&schema, writer_properties.get()); + + auto append_values_to_bloom_filter = [&](const std::vector& insert_hashes) { + builder->AppendRowGroup(); + auto bloom_filter = builder->GetOrCreateBloomFilter(0); + ASSERT_NE(nullptr, bloom_filter); + ASSERT_EQ(bloom_filter->GetBitsetSize(), + BlockSplitBloomFilter::OptimalNumOfBytes(bloom_filter_options.ndv, + bloom_filter_options.fpp)); + for (uint64_t hash : insert_hashes) { + bloom_filter->InsertHash(hash); + } + }; + // First row-group + append_values_to_bloom_filter({100, 200}); + // Second row-group + append_values_to_bloom_filter({300, 400}); + auto sink = CreateOutputStream(); + BloomFilterLocation location; + builder->WriteTo(sink.get(), &location); + EXPECT_EQ(2, location.bloom_filter_location.size()); + for (auto& [row_group_id, row_group_bloom_filter] : location.bloom_filter_location) { + EXPECT_EQ(1, row_group_bloom_filter.size()); + EXPECT_TRUE(row_group_bloom_filter.find(0) != row_group_bloom_filter.end()); + EXPECT_FALSE(row_group_bloom_filter.find(1) != row_group_bloom_filter.end()); + } + + struct RowGroupBloomFilterCase { + int32_t row_group_id; + std::vector exists_hashes; + std::vector unexists_hashes; + }; + + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + std::vector cases = { + RowGroupBloomFilterCase{/*row_group_id=*/0, /*exists_hashes=*/{100, 200}, + /*unexists_hashes=*/{300, 400}}, + RowGroupBloomFilterCase{/*row_group_id=*/1, /*exists_hashes=*/{300, 400}, + /*unexists_hashes=*/{100, 200}}}; + for (const auto& c : cases) { + auto& bloom_filter_location = location.bloom_filter_location[c.row_group_id]; + int64_t bloom_filter_offset = bloom_filter_location[0].offset; + int32_t bloom_filter_length = bloom_filter_location[0].length; + + ReaderProperties reader_properties; + ::arrow::io::BufferReader reader( + ::arrow::SliceBuffer(buffer, bloom_filter_offset, bloom_filter_length)); + auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader); + for (uint64_t hash : c.exists_hashes) { + EXPECT_TRUE(filter.FindHash(hash)); + } + for (uint64_t hash : c.unexists_hashes) { + EXPECT_FALSE(filter.FindHash(hash)); + } + } +} + +TEST(BloomFilterBuilder, InvalidOperations) { + SchemaDescriptor schema; + schema::NodePtr root = schema::GroupNode::Make( + "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::Boolean("c2")}); + schema.Init(root); + WriterProperties::Builder properties_builder; + BloomFilterOptions bloom_filter_options; + bloom_filter_options.ndv = 100; + properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1"); + properties_builder.enable_bloom_filter_options(bloom_filter_options, "c2"); + auto properties = properties_builder.build(); + auto builder = internal::BloomFilterBuilder::Make(&schema, properties.get()); + // AppendRowGroup() is not called and expect throw. + EXPECT_THROW_THAT( + [&]() { builder->GetOrCreateBloomFilter(0); }, ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("No row group appended to BloomFilterBuilder"))); + + builder->AppendRowGroup(); + // GetOrCreateBloomFilter() with wrong column ordinal expect throw. + EXPECT_THROW_THAT([&]() { builder->GetOrCreateBloomFilter(2); }, ParquetException, + ::testing::Property(&ParquetException::what, + ::testing::HasSubstr("Invalid column ordinal"))); + // GetOrCreateBloomFilter() with boolean expect throw. + EXPECT_THROW_THAT( + [&]() { builder->GetOrCreateBloomFilter(1); }, ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("BloomFilterBuilder does not support boolean type"))); + auto filter = builder->GetOrCreateBloomFilter(0); + // Call GetOrCreateBloomFilter the second time it is actually a cached version. + EXPECT_EQ(filter, builder->GetOrCreateBloomFilter(0)); + auto sink = CreateOutputStream(); + BloomFilterLocation location; + builder->WriteTo(sink.get(), &location); + EXPECT_EQ(1, location.bloom_filter_location.size()); + // Multiple WriteTo() expect throw. + EXPECT_THROW_THAT( + [&]() { builder->WriteTo(sink.get(), &location); }, ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("Cannot write a finished BloomFilterBuilder"))); +} + +} // namespace parquet::test diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 79dd13ea49f07..44a69536dc591 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -44,7 +44,10 @@ #include "arrow/util/logging.h" #include "arrow/util/rle_encoding_internal.h" #include "arrow/util/type_traits.h" +#include "arrow/util/unreachable.h" #include "arrow/visit_array_inline.h" +#include "arrow/visit_data_inline.h" +#include "parquet/bloom_filter.h" #include "parquet/column_page.h" #include "parquet/encoding.h" #include "parquet/encryption/encryption_internal.h" @@ -156,6 +159,8 @@ inline const T* AddIfNotNull(const T* base, int64_t offset) { return nullptr; } +constexpr int64_t kHashBatchSize = 256; + } // namespace LevelEncoder::LevelEncoder() {} @@ -733,7 +738,8 @@ class ColumnWriterImpl { public: ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, const bool use_dictionary, - Encoding::type encoding, const WriterProperties* properties) + Encoding::type encoding, const WriterProperties* properties, + BloomFilter* bloom_filter) : metadata_(metadata), descr_(metadata->descr()), level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())), @@ -752,7 +758,8 @@ class ColumnWriterImpl { closed_(false), fallback_(false), definition_levels_sink_(allocator_), - repetition_levels_sink_(allocator_) { + repetition_levels_sink_(allocator_), + bloom_filter_(bloom_filter) { definition_levels_rle_ = std::static_pointer_cast(AllocateBuffer(allocator_, 0)); repetition_levels_rle_ = @@ -892,6 +899,8 @@ class ColumnWriterImpl { std::vector> data_pages_; + BloomFilter* bloom_filter_; + private: void InitSinks() { definition_levels_sink_.Rewind(0); @@ -1217,9 +1226,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, const bool use_dictionary, - Encoding::type encoding, const WriterProperties* properties) - : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding, - properties) { + Encoding::type encoding, const WriterProperties* properties, + BloomFilter* bloom_filter) + : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding, properties, + bloom_filter) { current_encoder_ = MakeEncoder(ParquetType::type_num, encoding, use_dictionary, descr_, properties->memory_pool()); // We have to dynamic_cast as some compilers don't want to static_cast @@ -1720,6 +1730,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, page_statistics_->Update(values, num_values, num_nulls); } UpdateUnencodedDataBytes(); + UpdateBloomFilter(values, num_values); } /// \brief Write values with spaces and update page statistics accordingly. @@ -1741,8 +1752,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, if (num_values != num_spaced_values) { current_value_encoder_->PutSpaced(values, static_cast(num_spaced_values), valid_bits, valid_bits_offset); + UpdateBloomFilterSpaced(values, num_spaced_values, valid_bits, valid_bits_offset); } else { current_value_encoder_->Put(values, static_cast(num_values)); + UpdateBloomFilter(values, num_values); } if (page_statistics_ != nullptr) { page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, @@ -1750,6 +1763,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, } UpdateUnencodedDataBytes(); } + + void UpdateBloomFilter(const T* values, int64_t num_values); + void UpdateBloomFilterSpaced(const T* values, int64_t num_values, + const uint8_t* valid_bits, int64_t valid_bits_offset); + void UpdateBloomFilterArray(const ::arrow::Array& values); }; template @@ -1797,6 +1815,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( auto update_stats = [&](int64_t num_chunk_levels, const std::shared_ptr& chunk_indices) { + DCHECK(page_statistics_ != nullptr || bloom_filter_ != nullptr); // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the // indices array to a (hopefully smaller) referenced indices array. Second, a copy // of the values array to a (probably not smaller) referenced values array. @@ -1821,11 +1840,15 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( &exec_ctx)); referenced_dictionary = referenced_dictionary_datum.make_array(); } - - int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); - page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + if (page_statistics_ != nullptr) { + int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); + page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + } + if (bloom_filter_ != nullptr) { + UpdateBloomFilterArray(*referenced_dictionary); + } }; int64_t value_offset = 0; @@ -1842,7 +1865,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); - if (page_statistics_) { + if (page_statistics_ || bloom_filter_) { update_stats(/*num_chunk_levels=*/batch_size, writeable_indices); } PARQUET_ASSIGN_OR_THROW( @@ -2273,6 +2296,9 @@ Status TypedColumnWriterImpl::WriteArrowDense( // ---------------------------------------------------------------------- // Write Arrow to BYTE_ARRAY +template <> +void TypedColumnWriterImpl::UpdateBloomFilterArray( + const ::arrow::Array& values); template <> Status TypedColumnWriterImpl::WriteArrowDense( @@ -2306,6 +2332,7 @@ Status TypedColumnWriterImpl::WriteArrowDense( page_statistics_->IncrementNumValues(non_null); } UpdateUnencodedDataBytes(); + UpdateBloomFilterArray(*data_slice); CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null, check_page); CheckDictionarySizeLimit(); @@ -2458,12 +2485,163 @@ Status TypedColumnWriterImpl::WriteArrowDense( return Status::OK(); } +template +void TypedColumnWriterImpl::UpdateBloomFilter(const T* values, + int64_t num_values) { + if (bloom_filter_) { + std::array hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i); + bloom_filter_->Hashes(values, static_cast(current_hash_batch_size), + hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast(current_hash_batch_size)); + } + } +} + +template <> +void TypedColumnWriterImpl::UpdateBloomFilter(const FLBA* values, + int64_t num_values) { + if (bloom_filter_) { + std::array hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i); + bloom_filter_->Hashes(values, descr_->type_length(), + static_cast(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast(current_hash_batch_size)); + } + } +} + +template <> +void TypedColumnWriterImpl::UpdateBloomFilter(const bool*, int64_t) { + if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) { + throw ParquetException("BooleanType does not support bloom filters"); + } +} + +template +void TypedColumnWriterImpl::UpdateBloomFilterSpaced(const T* values, + int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (bloom_filter_) { + std::array hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto current_hash_batch_size = std::min(kHashBatchSize, length - i); + bloom_filter_->Hashes(values + i + position, + static_cast(current_hash_batch_size), + hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast(current_hash_batch_size)); + } + }); + } +} + +template <> +void TypedColumnWriterImpl::UpdateBloomFilterSpaced(const bool*, int64_t, + const uint8_t*, + int64_t) { + // BooleanType does not have a bloom filter currently, + // so bloom_filter_ should always be nullptr. + if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) { + throw ParquetException("BooleanType does not support bloom filters"); + } +} + +template <> +void TypedColumnWriterImpl::UpdateBloomFilterSpaced(const FLBA* values, + int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (bloom_filter_) { + std::array hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto current_hash_batch_size = std::min(kHashBatchSize, length - i); + bloom_filter_->Hashes(values + i + position, descr_->type_length(), + static_cast(current_hash_batch_size), + hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast(current_hash_batch_size)); + } + }); + } +} + +template +void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType& array) { + // Using a smaller size because an extra `byte_arrays` are used. + constexpr int64_t kBinaryHashBatchSize = 64; + std::array byte_arrays; + std::array hashes; + int hashes_idx = 0; + auto flush_hashes = [&]() { + DCHECK_NE(0, hashes_idx); + bloom_filter->Hashes(byte_arrays.data(), static_cast(hashes_idx), hashes.data()); + bloom_filter->InsertHashes(hashes.data(), static_cast(hashes_idx)); + hashes_idx = 0; + }; + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline( + *array.data(), + [&](std::string_view view) { + if (hashes_idx == kHashBatchSize) { + flush_hashes(); + } + byte_arrays[hashes_idx] = view; + ++hashes_idx; + return Status::OK(); + }, + []() { return Status::OK(); })); + if (hashes_idx != 0) { + flush_hashes(); + } +} + +template <> +void TypedColumnWriterImpl::UpdateBloomFilterArray( + const ::arrow::Array& values) { + if (bloom_filter_) { + // TODO(mwish): GH-37832 currently we don't support write StringView/BinaryView to + // parquet file. + if (!::arrow::is_base_binary_like(values.type_id())) { + throw ParquetException("Only BaseBinaryArray and subclasses supported"); + } + + if (::arrow::is_binary_like(values.type_id())) { + UpdateBinaryBloomFilter(bloom_filter_, + checked_cast(values)); + } else { + // TODO(mwish): GH-37832 currently we don't support write StringView/BinaryView to + // parquet file. + if (!::arrow::is_large_binary_like(values.type_id())) { + throw ParquetException("Only LargeBinaryArray and subclasses supported"); + } + UpdateBinaryBloomFilter(bloom_filter_, + checked_cast(values)); + } + } +} + +template +void TypedColumnWriterImpl::UpdateBloomFilterArray(const ::arrow::Array& values) { + // Only ByteArray type would write ::arrow::Array directly. + ::arrow::Unreachable("UpdateBloomFilterArray for non ByteArray type is unreachable"); +} + // ---------------------------------------------------------------------- // Dynamic column writer constructor std::shared_ptr ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, - const WriterProperties* properties) { + const WriterProperties* properties, + BloomFilter* bloom_filter) { const ColumnDescriptor* descr = metadata->descr(); const bool use_dictionary = properties->dictionary_enabled(descr->path()) && descr->physical_type() != Type::BOOLEAN; @@ -2479,32 +2657,37 @@ std::shared_ptr ColumnWriter::Make(ColumnChunkMetaDataBuilder* met encoding = properties->dictionary_index_encoding(); } switch (descr->physical_type()) { - case Type::BOOLEAN: + case Type::BOOLEAN: { + if (bloom_filter != nullptr) { + throw ParquetException("Bloom filter is not supported for boolean type"); + } return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, + /*bloom_filter=*/nullptr); + } case Type::INT32: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::INT64: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::INT96: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::FLOAT: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::DOUBLE: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::BYTE_ARRAY: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); default: - ParquetException::NYI("type reader not implemented"); + ParquetException::NYI("column writer not implemented for this Parquet type"); } // Unreachable code, but suppress compiler warning return std::shared_ptr(nullptr); diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index bd329d61053f2..40537ed66949b 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -129,7 +129,8 @@ class PARQUET_EXPORT ColumnWriter { static std::shared_ptr Make(ColumnChunkMetaDataBuilder*, std::unique_ptr, - const WriterProperties* properties); + const WriterProperties* properties, + BloomFilter* bloom_filter = NULLPTR); /// \brief Closes the ColumnWriter, commits any buffered values to pages. /// \return Total size of the column in bytes diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 41c4a3223e10a..dcf394e81e540 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -30,6 +30,8 @@ #include "arrow/util/config.h" #include "arrow/util/key_value_metadata.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_builder.h" #include "parquet/column_page.h" #include "parquet/column_reader.h" #include "parquet/column_writer.h" @@ -1719,6 +1721,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) { } } +template +class TestBloomFilterWriter : public TestPrimitiveWriter { + public: + void SetUp() override { + TestPrimitiveWriter::SetUp(); + builder_ = nullptr; + bloom_filter_ = nullptr; + } + + std::shared_ptr> BuildWriterWithBloomFilter( + int64_t output_size, const ColumnProperties& column_properties); + + std::unique_ptr builder_; + BloomFilter* bloom_filter_{nullptr}; +}; + +template +std::shared_ptr> +TestBloomFilterWriter::BuildWriterWithBloomFilter( + int64_t output_size, const ColumnProperties& column_properties) { + this->sink_ = CreateOutputStream(); + WriterProperties::Builder wp_builder; + if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY || + column_properties.encoding() == Encoding::RLE_DICTIONARY) { + wp_builder.enable_dictionary(); + wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE); + } else { + wp_builder.disable_dictionary(); + wp_builder.encoding(column_properties.encoding()); + } + auto path = this->schema_.Column(0)->path(); + if (column_properties.bloom_filter_enabled()) { + wp_builder.enable_bloom_filter_options( + column_properties.bloom_filter_options().value(), path); + } else { + wp_builder.disable_bloom_filter(path); + } + this->writer_properties_ = wp_builder.build(); + + this->metadata_ = + ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_); + std::unique_ptr pager = PageWriter::Open( + this->sink_, column_properties.compression(), this->metadata_.get()); + builder_ = + internal::BloomFilterBuilder::Make(&this->schema_, this->writer_properties_.get()); + // Initialize RowGroup + builder_->AppendRowGroup(); + bloom_filter_ = builder_->GetOrCreateBloomFilter(0); + std::shared_ptr writer = + ColumnWriter::Make(this->metadata_.get(), std::move(pager), + this->writer_properties_.get(), bloom_filter_); + return std::static_pointer_cast>(writer); +} + +// Note: BooleanType is Excluded. +using TestBloomFilterTypes = ::testing::Types; + +TYPED_TEST_SUITE(TestBloomFilterWriter, TestBloomFilterTypes); + +TYPED_TEST(TestBloomFilterWriter, Basic) { + this->GenerateData(SMALL_SIZE); + ColumnProperties column_properties; + BloomFilterOptions options; + options.ndv = SMALL_SIZE; + column_properties.set_bloom_filter_options(options); + + auto writer = this->BuildWriterWithBloomFilter(SMALL_SIZE, column_properties); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + writer->Close(); + + // Make sure that column values are read correctly + this->SetupValuesOut(SMALL_SIZE); + this->ReadColumnFully(); + ASSERT_EQ(SMALL_SIZE, this->values_read_); + ASSERT_EQ(this->values_, this->values_out_); + + // Verify bloom filter + for (auto& value : this->values_) { + if constexpr (std::is_same_v) { + EXPECT_TRUE(this->bloom_filter_->FindHash( + this->bloom_filter_->Hash(&value, this->descr_->type_length()))); + } else { + EXPECT_TRUE(this->bloom_filter_->FindHash(this->bloom_filter_->Hash(value))); + } + } +} + using TestInt32Writer = TestPrimitiveWriter; TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) { diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index f80a095a13587..a6d8115434ce0 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -25,6 +25,7 @@ #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" +#include "parquet/bloom_filter_builder.h" #include "parquet/column_writer.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_encryptor.h" @@ -91,7 +92,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { RowGroupMetaDataBuilder* metadata, int16_t row_group_ordinal, const WriterProperties* properties, bool buffered_row_group = false, InternalFileEncryptor* file_encryptor = nullptr, - PageIndexBuilder* page_index_builder = nullptr) + PageIndexBuilder* page_index_builder = nullptr, + internal::BloomFilterBuilder* bloom_filter_builder = nullptr) : sink_(std::move(sink)), metadata_(metadata), properties_(properties), @@ -103,7 +105,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { num_rows_(0), buffered_row_group_(buffered_row_group), file_encryptor_(file_encryptor), - page_index_builder_(page_index_builder) { + page_index_builder_(page_index_builder), + bloom_filter_builder_(bloom_filter_builder) { if (buffered_row_group) { InitColumns(); } else { @@ -230,6 +233,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { bool buffered_row_group_; InternalFileEncryptor* file_encryptor_; PageIndexBuilder* page_index_builder_; + internal::BloomFilterBuilder* bloom_filter_builder_; void CheckRowsWritten() const { // verify when only one column is written at a time @@ -279,7 +283,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents { auto oi_builder = page_index_builder_ && column_properties.page_index_enabled() ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) : nullptr; - + BloomFilter* bloom_filter = + bloom_filter_builder_ && column_properties.bloom_filter_enabled() + ? bloom_filter_builder_->GetOrCreateBloomFilter(column_ordinal) + : nullptr; const CodecOptions* codec_options = column_properties.codec_options() ? column_properties.codec_options().get() : nullptr; @@ -293,7 +300,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { static_cast(column_ordinal), properties_->memory_pool(), buffered_row_group_, meta_encryptor, data_encryptor, properties_->page_checksum_enabled(), ci_builder, oi_builder, *codec_options); - return ColumnWriter::Make(col_meta, std::move(pager), properties_); + return ColumnWriter::Make(col_meta, std::move(pager), properties_, bloom_filter); } // If buffered_row_group_ is false, only column_writers_[0] is used as current writer. @@ -331,6 +338,13 @@ class FileSerializer : public ParquetFileWriter::Contents { } row_group_writer_.reset(); + // In Parquet standard, the Bloom filter data can be stored before the page indexes + // after all row groups or stored between row groups. We choose to store it before + // the page indexes after all row groups. + // Also, Putting all bloom filters together may provide a good chance to coalesce + // I/Os of different bloom filters. Especially when only one column has enabled it, + // which is the common case. + WriteBloomFilter(); WritePageIndex(); // Write magic bytes and metadata @@ -375,9 +389,12 @@ class FileSerializer : public ParquetFileWriter::Contents { if (page_index_builder_) { page_index_builder_->AppendRowGroup(); } + if (bloom_filter_builder_) { + bloom_filter_builder_->AppendRowGroup(); + } std::unique_ptr contents(new RowGroupSerializer( sink_, rg_metadata, row_group_ordinal, properties_.get(), buffered_row_group, - file_encryptor_.get(), page_index_builder_.get())); + file_encryptor_.get(), page_index_builder_.get(), bloom_filter_builder_.get())); row_group_writer_ = std::make_unique(std::move(contents)); return row_group_writer_.get(); } @@ -464,6 +481,21 @@ class FileSerializer : public ParquetFileWriter::Contents { } } + void WriteBloomFilter() { + if (bloom_filter_builder_ != nullptr) { + if (properties_->file_encryption_properties()) { + ParquetException::NYI("Encryption is not supported with bloom filter"); + } + // Serialize bloom filter after all row groups have been written and report + // location to the file metadata. + BloomFilterLocation bloom_filter_location; + bloom_filter_builder_->WriteTo(sink_.get(), &bloom_filter_location); + metadata_->SetBloomFilterLocation(bloom_filter_location); + // Release the memory for BloomFilter. + bloom_filter_builder_ = nullptr; + } + } + std::shared_ptr sink_; bool is_open_; const std::shared_ptr properties_; @@ -474,6 +506,7 @@ class FileSerializer : public ParquetFileWriter::Contents { std::unique_ptr row_group_writer_; std::unique_ptr page_index_builder_; std::unique_ptr file_encryptor_; + std::unique_ptr bloom_filter_builder_; void StartFile() { auto file_encryption_properties = properties_->file_encryption_properties(); @@ -511,7 +544,10 @@ class FileSerializer : public ParquetFileWriter::Contents { PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); } } - + if (properties_->bloom_filter_enabled()) { + bloom_filter_builder_ = + internal::BloomFilterBuilder::Make(schema(), properties_.get()); + } if (properties_->page_index_enabled()) { page_index_builder_ = PageIndexBuilder::Make(&schema_, file_encryptor_.get()); } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 9b53da021f52e..9c8f645941cb9 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -1931,36 +1931,61 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { } void SetPageIndexLocation(const PageIndexLocation& location) { - auto set_index_location = + auto set_index_location = [this](size_t row_group_ordinal, + const FileIndexLocation& file_index_location, + bool column_index) { + auto& row_group_metadata = this->row_groups_.at(row_group_ordinal); + auto iter = file_index_location.find(row_group_ordinal); + if (iter != file_index_location.cend()) { + const auto& row_group_index_location = iter->second; + for (size_t i = 0; i < row_group_index_location.size(); ++i) { + if (i >= row_group_metadata.columns.size()) { + throw ParquetException("Cannot find metadata for column ordinal ", i); + } + auto& column_metadata = row_group_metadata.columns.at(i); + const auto& index_location = row_group_index_location.at(i); + if (index_location.has_value()) { + if (column_index) { + column_metadata.__set_column_index_offset(index_location->offset); + column_metadata.__set_column_index_length(index_location->length); + } else { + column_metadata.__set_offset_index_offset(index_location->offset); + column_metadata.__set_offset_index_length(index_location->length); + } + } + } + } + }; + + for (size_t i = 0; i < row_groups_.size(); ++i) { + set_index_location(i, location.column_index_location, true); + set_index_location(i, location.offset_index_location, false); + } + } + + // Update location to all bloom filters in the parquet file. + void SetBloomFilterLocation(const BloomFilterLocation& location) { + auto set_bloom_filter_location = [this](size_t row_group_ordinal, - const PageIndexLocation::FileIndexLocation& file_index_location, - bool column_index) { + const FileBloomFilterLocation& file_bloom_filter_location) { auto& row_group_metadata = this->row_groups_.at(row_group_ordinal); - auto iter = file_index_location.find(row_group_ordinal); - if (iter != file_index_location.cend()) { - const auto& row_group_index_location = iter->second; - for (size_t i = 0; i < row_group_index_location.size(); ++i) { - if (i >= row_group_metadata.columns.size()) { - throw ParquetException("Cannot find metadata for column ordinal ", i); - } - auto& column_metadata = row_group_metadata.columns.at(i); - const auto& index_location = row_group_index_location.at(i); - if (index_location.has_value()) { - if (column_index) { - column_metadata.__set_column_index_offset(index_location->offset); - column_metadata.__set_column_index_length(index_location->length); - } else { - column_metadata.__set_offset_index_offset(index_location->offset); - column_metadata.__set_offset_index_length(index_location->length); - } - } + auto iter = file_bloom_filter_location.find(row_group_ordinal); + if (iter != file_bloom_filter_location.cend()) { + const auto& row_group_bloom_filter_location = iter->second; + for (auto& [column_id, bloom_filter_location] : + row_group_bloom_filter_location) { + DCHECK_LT(static_cast(column_id), + row_group_metadata.columns.size()); + auto& column = row_group_metadata.columns[column_id]; + auto& column_metadata = column.meta_data; + column_metadata.__set_bloom_filter_offset(bloom_filter_location.offset); + column_metadata.__set_bloom_filter_length(bloom_filter_location.length); } } }; for (size_t i = 0; i < row_groups_.size(); ++i) { - set_index_location(i, location.column_index_location, true); - set_index_location(i, location.offset_index_location, false); + set_bloom_filter_location(i, location.bloom_filter_location); } } @@ -2082,6 +2107,10 @@ void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location impl_->SetPageIndexLocation(location); } +void FileMetaDataBuilder::SetBloomFilterLocation(const BloomFilterLocation& location) { + impl_->SetBloomFilterLocation(location); +} + std::unique_ptr FileMetaDataBuilder::Finish( const std::shared_ptr& key_value_metadata) { return impl_->Finish(key_value_metadata); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 9a3964f7d6574..cd1da0725ca48 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -136,7 +136,6 @@ class PARQUET_EXPORT ColumnChunkMetaData { const std::string& file_path() const; // column metadata - bool is_metadata_set() const; Type::type type() const; int64_t num_values() const; std::shared_ptr path_in_schema() const; @@ -499,21 +498,40 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { std::unique_ptr impl_; }; +/// Alias type of page index location of a row group. The index location +/// is located by column ordinal. If a column does not have a page index, +/// its value is set to std::nullopt. +using RowGroupIndexLocation = std::vector>; + +/// Alias type of bloom filter location of a row group. The filter location +/// is located by column ordinal. +/// +/// Number of columns with a bloom filter to be relatively small compared to +/// the number of overall columns, so map is used. +using RowGroupBloomFilterLocation = std::map; + +/// Alias type of page index and location of a parquet file. The +/// index location is located by the row group ordinal. +using FileIndexLocation = std::map; + +/// Alias type of bloom filter and location of a parquet file. The +/// index location is located by the row group ordinal. +using FileBloomFilterLocation = std::map; + /// \brief Public struct for location to all page indexes in a parquet file. struct PageIndexLocation { - /// Alias type of page index location of a row group. The index location - /// is located by column ordinal. If the column does not have the page index, - /// its value is set to std::nullopt. - using RowGroupIndexLocation = std::vector>; - /// Alias type of page index location of a parquet file. The index location - /// is located by the row group ordinal. - using FileIndexLocation = std::map; /// Row group column index locations which uses row group ordinal as the key. FileIndexLocation column_index_location; /// Row group offset index locations which uses row group ordinal as the key. FileIndexLocation offset_index_location; }; +/// \brief Public struct for location to all bloom filters in a parquet file. +struct BloomFilterLocation { + /// Row group bloom filter index locations which uses row group ordinal as the key. + FileBloomFilterLocation bloom_filter_location; +}; + class PARQUET_EXPORT FileMetaDataBuilder { public: // API convenience to get a MetaData builder @@ -528,6 +546,9 @@ class PARQUET_EXPORT FileMetaDataBuilder { // Update location to all page indexes in the parquet file void SetPageIndexLocation(const PageIndexLocation& location); + // Update location to all bloom filters in the parquet file. + void SetBloomFilterLocation(const BloomFilterLocation& location); + // Complete the Thrift structure std::unique_ptr Finish( const std::shared_ptr& key_value_metadata = NULLPTR); diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 19436b84a379b..f800f744b5257 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -167,6 +167,32 @@ static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOM static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = true; static constexpr SizeStatisticsLevel DEFAULT_SIZE_STATISTICS_LEVEL = SizeStatisticsLevel::PageAndColumnChunk; +static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024; +static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05; + +struct PARQUET_EXPORT BloomFilterOptions { + /// Expected number of distinct values to be inserted into the bloom filter. + /// + /// Usage of bloom filter is most beneficial for columns with large cardinality, + /// so a good heuristic is to set ndv to number of rows. However, it can reduce + /// disk size if you know in advance a smaller number of distinct values. + /// For very small ndv value it is probably not worth it to use bloom filter anyway. + /// + /// Increasing this value (without increasing fpp) will result in an increase in + /// disk or memory size. + int32_t ndv = DEFAULT_BLOOM_FILTER_NDV; + /// False-positive probability of the bloom filter. + /// + /// The bloom filter data structure is a trade of between disk and memory space + /// versus fpp, the smaller the fpp, the more memory and disk space is required, + /// thus setting it to a reasonable value e.g. 0.1, 0.05, or 0.001 is recommended. + /// + /// Setting to very small number diminishes the value of the filter itself, + /// as the bitset size is even larger than just storing the whole value. + /// User is also expected to set ndv if it can be known in advance in order + /// to largely reduce space usage. + double fpp = DEFAULT_BLOOM_FILTER_FPP; +}; class PARQUET_EXPORT ColumnProperties { public: @@ -214,6 +240,17 @@ class PARQUET_EXPORT ColumnProperties { page_index_enabled_ = page_index_enabled; } + void set_bloom_filter_options(std::optional bloom_filter_options) { + if (bloom_filter_options) { + if (bloom_filter_options->fpp > 1.0 || bloom_filter_options->fpp < 0.0) { + throw ParquetException( + "Bloom filter false-positive probability must fall in [0.0, 1.0], got " + + std::to_string(bloom_filter_options->fpp)); + } + } + bloom_filter_options_ = bloom_filter_options; + } + Encoding::type encoding() const { return encoding_; } Compression::type compression() const { return codec_; } @@ -235,6 +272,12 @@ class PARQUET_EXPORT ColumnProperties { bool page_index_enabled() const { return page_index_enabled_; } + std::optional bloom_filter_options() const { + return bloom_filter_options_; + } + + bool bloom_filter_enabled() const { return bloom_filter_options_ != std::nullopt; } + private: Encoding::type encoding_; Compression::type codec_; @@ -243,6 +286,7 @@ class PARQUET_EXPORT ColumnProperties { size_t max_stats_size_; std::shared_ptr codec_options_; bool page_index_enabled_; + std::optional bloom_filter_options_; }; class PARQUET_EXPORT WriterProperties { @@ -588,6 +632,43 @@ class PARQUET_EXPORT WriterProperties { return this->disable_statistics(path->ToDotString()); } + /// Disable bloom filter for the column specified by `path`. + /// Default disabled. + Builder* disable_bloom_filter(const std::string& path) { + bloom_filter_options_[path] = std::nullopt; + return this; + } + + /// Disable bloom filter for the column specified by `path`. + /// Default disabled. + Builder* disable_bloom_filter(const std::shared_ptr& path) { + return this->disable_bloom_filter(path->ToDotString()); + } + + /// Enable bloom filter options for the column specified by `path`. + /// + /// Default disabled. + /// + /// Note: Currently we don't support bloom filter for boolean columns, + /// ParquetException will be thrown during write if the column is of boolean type. + Builder* enable_bloom_filter_options(BloomFilterOptions bloom_filter_options, + const std::string& path) { + bloom_filter_options_[path] = bloom_filter_options; + return this; + } + + /// Enable bloom filter options for the column specified by `path`. + /// + /// Default disabled. + /// + /// Note: Currently we don't support bloom filter for boolean columns, + /// ParquetException will be thrown during write if the column is of boolean type. + Builder* enable_bloom_filter_options( + BloomFilterOptions bloom_filter_options, + const std::shared_ptr& path) { + return this->enable_bloom_filter_options(bloom_filter_options, path->ToDotString()); + } + /// Allow decimals with 1 <= precision <= 18 to be stored as integers. /// /// In Parquet, DECIMAL can be stored in any of the following physical types: @@ -695,6 +776,8 @@ class PARQUET_EXPORT WriterProperties { get(item.first).set_statistics_enabled(item.second); for (const auto& item : page_index_enabled_) get(item.first).set_page_index_enabled(item.second); + for (const auto& item : bloom_filter_options_) + get(item.first).set_bloom_filter_options(item.second); return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, @@ -730,6 +813,8 @@ class PARQUET_EXPORT WriterProperties { std::unordered_map dictionary_enabled_; std::unordered_map statistics_enabled_; std::unordered_map page_index_enabled_; + std::unordered_map> + bloom_filter_options_; }; inline MemoryPool* memory_pool() const { return pool_; } @@ -828,6 +913,19 @@ class PARQUET_EXPORT WriterProperties { return false; } + bool bloom_filter_enabled() const { + // Note: We do not encourage enabling bloom filter for all columns. So + // default_column_properties_.bloom_filter_enabled is always false and + // cannot be altered by user. Thus we can safely skip checking it here. + return std::any_of(column_properties_.begin(), column_properties_.end(), + [](const auto& p) { return p.second.bloom_filter_enabled(); }); + } + + std::optional bloom_filter_options( + const std::shared_ptr& path) const { + return column_properties(path).bloom_filter_options(); + } + inline FileEncryptionProperties* file_encryption_properties() const { return file_encryption_properties_.get(); } diff --git a/cpp/src/parquet/type_fwd.h b/cpp/src/parquet/type_fwd.h index cda0dc5a77e1f..0ba6d264cb325 100644 --- a/cpp/src/parquet/type_fwd.h +++ b/cpp/src/parquet/type_fwd.h @@ -89,6 +89,10 @@ class EncodedStatistics; class Statistics; struct SizeStatistics; +class BloomFilter; +struct BloomFilterOptions; +struct BloomFilterLocation; + class ColumnIndex; class OffsetIndex;