Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor predicate pushdown to reuse row group pruning in experimental PQ reader #17946

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 100 additions & 152 deletions cpp/src/io/parquet/bloom_filter_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/aligned_resource_adaptor.hpp>

#include <cuco/bloom_filter_policies.cuh>
#include <cuco/bloom_filter_ref.cuh>
Expand Down Expand Up @@ -163,108 +162,6 @@ struct bloom_filter_caster {
}
};

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now declared in reader_impl_helpers.hpp and defined at the bottom of this file.

* @brief Collects lists of equality predicate literals in the AST expression, one list per input
* table column. This is used in row group filtering based on bloom filters.
*/
class equality_literals_collector : public ast::detail::expression_transformer {
public:
equality_literals_collector() = default;

equality_literals_collector(ast::expression const& expr, cudf::size_type num_input_columns)
: _num_input_columns{num_input_columns}
{
_equality_literals.resize(_num_input_columns);
expr.accept(*this);
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::literal const& expr) override
{
return expr;
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::column_reference const& expr) override
{
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT,
"BloomfilterAST supports only left table");
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns,
"Column index cannot be more than number of columns in the table");
return expr;
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(
ast::column_name_reference const& expr) override
{
CUDF_FAIL("Column name reference is not supported in BloomfilterAST");
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::operation const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::operation const& expr) override
{
using cudf::ast::ast_operator;
auto const operands = expr.get_operands();
auto const op = expr.get_operator();

if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) {
// First operand should be column reference, second should be literal.
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2,
"Only binary operations are supported on column reference");
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get());
CUDF_EXPECTS(literal_ptr != nullptr,
"Second operand of binary operation with column reference must be a literal");
v->accept(*this);

// Push to the corresponding column's literals list iff equality predicate is seen
if (op == ast_operator::EQUAL) {
auto const col_idx = v->get_column_index();
_equality_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr));
}
} else {
// Just visit the operands and ignore any output
std::ignore = visit_operands(operands);
}

return expr;
}

/**
* @brief Vectors of equality literals in the AST expression, one per input table column
*
* @return Vectors of equality literals, one per input table column
*/
[[nodiscard]] std::vector<std::vector<ast::literal*>> get_equality_literals() &&
{
return std::move(_equality_literals);
}

private:
std::vector<std::vector<ast::literal*>> _equality_literals;

protected:
std::vector<std::reference_wrapper<ast::expression const>> visit_operands(
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands)
{
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands;
for (auto const& operand : operands) {
auto const new_operand = operand.get().accept(*this);
transformed_operands.push_back(new_operand);
}
return transformed_operands;
}
size_type _num_input_columns;
};

/**
* @brief Converts AST expression to bloom filter membership (BloomfilterAST) expression.
* This is used in row group filtering based on equality predicate.
Expand Down Expand Up @@ -502,6 +399,17 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources

} // namespace

size_t aggregate_reader_metadata::get_bloom_filter_alignment() const
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't use cuco stuff in .cpp file so separated out here

{
// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
return alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);
}

std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
Expand Down Expand Up @@ -599,55 +507,19 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types(
return parquet_types;
}

std::pair<std::optional<std::vector<std::vector<size_type>>>, bool>
aggregate_reader_metadata::apply_bloom_filters(
host_span<std::unique_ptr<datasource> const> sources,
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters(
std::vector<rmm::device_buffer>& bloom_filter_data,
host_span<std::vector<size_type> const> input_row_group_indices,
host_span<std::vector<ast::literal*> const> literals,
size_type total_row_groups,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
host_span<int const> equality_col_schemas,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
// Number of input table columns
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size());

// Collect equality literals for each input table column
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only do the filtering step here, bloom filter buffers are read in predicate_pushdown.cpp and passed here

auto const equality_literals =
equality_literals_collector{filter.get(), num_input_columns}.get_equality_literals();

// Collect schema indices of columns with equality predicate(s)
std::vector<cudf::size_type> equality_col_schemas;
thrust::copy_if(thrust::host,
output_column_schemas.begin(),
output_column_schemas.end(),
equality_literals.begin(),
std::back_inserter(equality_col_schemas),
[](auto& eq_literals) { return not eq_literals.empty(); });

// Return early if no column with equality predicate(s)
if (equality_col_schemas.empty()) { return {std::nullopt, false}; }

// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
auto constexpr alignment = alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);

// Aligned resource adaptor to allocate bloom filter buffers with
auto aligned_mr =
rmm::mr::aligned_resource_adaptor(cudf::get_current_device_resource(), alignment);

// Read a vector of bloom filter bitset device buffers for all columns with equality
// predicate(s) across all row groups
auto bloom_filter_data = read_bloom_filters(
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr);

// No bloom filter buffers, return early
if (bloom_filter_data.empty()) { return {std::nullopt, false}; }

// Get parquet types for the predicate columns
auto const parquet_types = get_parquet_types(input_row_group_indices, equality_col_schemas);

Expand Down Expand Up @@ -684,13 +556,13 @@ aggregate_reader_metadata::apply_bloom_filters(
auto const& dtype = output_dtypes[input_col_idx];

// Skip if no equality literals for this column
if (equality_literals[input_col_idx].empty()) { return; }
if (literals[input_col_idx].empty()) { return; }

// Skip if non-comparable (compound) type except string
if (cudf::is_compound(dtype) and dtype.id() != cudf::type_id::STRING) { return; }

// Add a column for all literals associated with an equality column
for (auto const& literal : equality_literals[input_col_idx]) {
for (auto const& literal : literals[input_col_idx]) {
bloom_filter_membership_columns.emplace_back(cudf::type_dispatcher<dispatch_storage_type>(
dtype, bloom_filter_col, equality_col_idx, dtype, literal, stream));
}
Expand All @@ -702,16 +574,92 @@ aggregate_reader_metadata::apply_bloom_filters(

// Convert AST to BloomfilterAST expression with reference to bloom filter membership
// in above `bloom_filter_membership_table`
bloom_filter_expression_converter bloom_filter_expr{
filter.get(), num_input_columns, {equality_literals}};
bloom_filter_expression_converter bloom_filter_expr{filter.get(), num_input_columns, {literals}};

// Filter bloom filter membership table with the BloomfilterAST expression and collect
// filtered row group indices
return {collect_filtered_row_group_indices(bloom_filter_membership_table,
bloom_filter_expr.get_bloom_filter_expr(),
input_row_group_indices,
stream),
true};
return collect_filtered_row_group_indices(bloom_filter_membership_table,
bloom_filter_expr.get_bloom_filter_expr(),
input_row_group_indices,
stream);
}

equality_literals_collector::equality_literals_collector() = default;

equality_literals_collector::equality_literals_collector(ast::expression const& expr,
cudf::size_type num_input_columns)
: _num_input_columns{num_input_columns}
{
_literals.resize(_num_input_columns);
expr.accept(*this);
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::literal const& expr)
{
return expr;
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::column_reference const& expr)
{
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT,
"BloomfilterAST supports only left table");
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns,
"Column index cannot be more than number of columns in the table");
return expr;
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::column_name_reference const& expr)
{
CUDF_FAIL("Column name reference is not supported in BloomfilterAST");
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::operation const& expr)
{
using cudf::ast::ast_operator;
auto const operands = expr.get_operands();
auto const op = expr.get_operator();

if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) {
// First operand should be column reference, second should be literal.
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2,
"Only binary operations are supported on column reference");
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get());
CUDF_EXPECTS(literal_ptr != nullptr,
"Second operand of binary operation with column reference must be a literal");
v->accept(*this);

// Push to the corresponding column's literals list iff equality predicate is seen
if (op == ast_operator::EQUAL) {
auto const col_idx = v->get_column_index();
_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr));
}
} else {
// Just visit the operands and ignore any output
std::ignore = visit_operands(operands);
}

return expr;
}

std::vector<std::vector<ast::literal*>> equality_literals_collector::get_literals() &&
{
return std::move(_literals);
}

std::vector<std::reference_wrapper<ast::expression const>>
equality_literals_collector::visit_operands(
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands)
{
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands;
for (auto const& operand : operands) {
auto const new_operand = operand.get().accept(*this);
transformed_operands.push_back(new_operand);
}
return transformed_operands;
}

} // namespace cudf::io::parquet::detail
Loading
Loading