-
Notifications
You must be signed in to change notification settings - Fork 931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor predicate pushdown to reuse row group pruning in experimental PQ reader #17946
Changes from 1 commit
9889ccb
e5cb699
d8f7c9e
d322a7e
564f8e1
3fd14d0
20528ba
e842d86
f82fcb5
694cf19
cbe6669
c5bc69f
34420c2
38948d6
6632713
97d787d
978021d
2ebd9bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,6 @@ | |
#include <rmm/cuda_stream_view.hpp> | ||
#include <rmm/device_buffer.hpp> | ||
#include <rmm/exec_policy.hpp> | ||
#include <rmm/mr/device/aligned_resource_adaptor.hpp> | ||
|
||
#include <cuco/bloom_filter_policies.cuh> | ||
#include <cuco/bloom_filter_ref.cuh> | ||
|
@@ -163,108 +162,6 @@ struct bloom_filter_caster { | |
} | ||
}; | ||
|
||
/** | ||
* @brief Collects lists of equality predicate literals in the AST expression, one list per input | ||
* table column. This is used in row group filtering based on bloom filters. | ||
*/ | ||
class equality_literals_collector : public ast::detail::expression_transformer { | ||
public: | ||
equality_literals_collector() = default; | ||
|
||
equality_literals_collector(ast::expression const& expr, cudf::size_type num_input_columns) | ||
: _num_input_columns{num_input_columns} | ||
{ | ||
_equality_literals.resize(_num_input_columns); | ||
expr.accept(*this); | ||
} | ||
|
||
/** | ||
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& ) | ||
*/ | ||
std::reference_wrapper<ast::expression const> visit(ast::literal const& expr) override | ||
{ | ||
return expr; | ||
} | ||
|
||
/** | ||
* @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& ) | ||
*/ | ||
std::reference_wrapper<ast::expression const> visit(ast::column_reference const& expr) override | ||
{ | ||
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT, | ||
"BloomfilterAST supports only left table"); | ||
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns, | ||
"Column index cannot be more than number of columns in the table"); | ||
return expr; | ||
} | ||
|
||
/** | ||
* @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& ) | ||
*/ | ||
std::reference_wrapper<ast::expression const> visit( | ||
ast::column_name_reference const& expr) override | ||
{ | ||
CUDF_FAIL("Column name reference is not supported in BloomfilterAST"); | ||
} | ||
|
||
/** | ||
* @copydoc ast::detail::expression_transformer::visit(ast::operation const& ) | ||
*/ | ||
std::reference_wrapper<ast::expression const> visit(ast::operation const& expr) override | ||
{ | ||
using cudf::ast::ast_operator; | ||
auto const operands = expr.get_operands(); | ||
auto const op = expr.get_operator(); | ||
|
||
if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) { | ||
// First operand should be column reference, second should be literal. | ||
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2, | ||
"Only binary operations are supported on column reference"); | ||
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get()); | ||
CUDF_EXPECTS(literal_ptr != nullptr, | ||
"Second operand of binary operation with column reference must be a literal"); | ||
v->accept(*this); | ||
|
||
// Push to the corresponding column's literals list iff equality predicate is seen | ||
if (op == ast_operator::EQUAL) { | ||
auto const col_idx = v->get_column_index(); | ||
_equality_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr)); | ||
} | ||
} else { | ||
// Just visit the operands and ignore any output | ||
std::ignore = visit_operands(operands); | ||
} | ||
|
||
return expr; | ||
} | ||
|
||
/** | ||
* @brief Vectors of equality literals in the AST expression, one per input table column | ||
* | ||
* @return Vectors of equality literals, one per input table column | ||
*/ | ||
[[nodiscard]] std::vector<std::vector<ast::literal*>> get_equality_literals() && | ||
{ | ||
return std::move(_equality_literals); | ||
} | ||
|
||
private: | ||
std::vector<std::vector<ast::literal*>> _equality_literals; | ||
|
||
protected: | ||
std::vector<std::reference_wrapper<ast::expression const>> visit_operands( | ||
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands) | ||
{ | ||
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands; | ||
for (auto const& operand : operands) { | ||
auto const new_operand = operand.get().accept(*this); | ||
transformed_operands.push_back(new_operand); | ||
} | ||
return transformed_operands; | ||
} | ||
size_type _num_input_columns; | ||
}; | ||
|
||
/** | ||
* @brief Converts AST expression to bloom filter membership (BloomfilterAST) expression. | ||
* This is used in row group filtering based on equality predicate. | ||
|
@@ -502,6 +399,17 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources | |
|
||
} // namespace | ||
|
||
size_t aggregate_reader_metadata::get_bloom_filter_alignment() const | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't use cuco stuff in .cpp file so separated out here |
||
{ | ||
// Required alignment: | ||
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67 | ||
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>; | ||
return alignof(cuco::bloom_filter_ref<cuda::std::byte, | ||
cuco::extent<std::size_t>, | ||
cuco::thread_scope_thread, | ||
policy_type>::filter_block_type); | ||
} | ||
|
||
std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters( | ||
host_span<std::unique_ptr<datasource> const> sources, | ||
host_span<std::vector<size_type> const> row_group_indices, | ||
|
@@ -599,55 +507,19 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types( | |
return parquet_types; | ||
} | ||
|
||
std::pair<std::optional<std::vector<std::vector<size_type>>>, bool> | ||
aggregate_reader_metadata::apply_bloom_filters( | ||
host_span<std::unique_ptr<datasource> const> sources, | ||
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters( | ||
std::vector<rmm::device_buffer>& bloom_filter_data, | ||
host_span<std::vector<size_type> const> input_row_group_indices, | ||
host_span<std::vector<ast::literal*> const> equality_literals, | ||
size_type total_row_groups, | ||
host_span<data_type const> output_dtypes, | ||
host_span<int const> output_column_schemas, | ||
host_span<int const> equality_col_schemas, | ||
std::reference_wrapper<ast::expression const> filter, | ||
rmm::cuda_stream_view stream) const | ||
{ | ||
// Number of input table columns | ||
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size()); | ||
|
||
// Collect equality literals for each input table column | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only do the filtering step here, bloom filter buffers are read in |
||
auto const equality_literals = | ||
equality_literals_collector{filter.get(), num_input_columns}.get_equality_literals(); | ||
|
||
// Collect schema indices of columns with equality predicate(s) | ||
std::vector<cudf::size_type> equality_col_schemas; | ||
thrust::copy_if(thrust::host, | ||
output_column_schemas.begin(), | ||
output_column_schemas.end(), | ||
equality_literals.begin(), | ||
std::back_inserter(equality_col_schemas), | ||
[](auto& eq_literals) { return not eq_literals.empty(); }); | ||
|
||
// Return early if no column with equality predicate(s) | ||
if (equality_col_schemas.empty()) { return {std::nullopt, false}; } | ||
|
||
// Required alignment: | ||
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67 | ||
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>; | ||
auto constexpr alignment = alignof(cuco::bloom_filter_ref<cuda::std::byte, | ||
cuco::extent<std::size_t>, | ||
cuco::thread_scope_thread, | ||
policy_type>::filter_block_type); | ||
|
||
// Aligned resource adaptor to allocate bloom filter buffers with | ||
auto aligned_mr = | ||
rmm::mr::aligned_resource_adaptor(cudf::get_current_device_resource(), alignment); | ||
|
||
// Read a vector of bloom filter bitset device buffers for all columns with equality | ||
// predicate(s) across all row groups | ||
auto bloom_filter_data = read_bloom_filters( | ||
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr); | ||
|
||
// No bloom filter buffers, return early | ||
if (bloom_filter_data.empty()) { return {std::nullopt, false}; } | ||
|
||
// Get parquet types for the predicate columns | ||
auto const parquet_types = get_parquet_types(input_row_group_indices, equality_col_schemas); | ||
|
||
|
@@ -707,11 +579,88 @@ aggregate_reader_metadata::apply_bloom_filters( | |
|
||
// Filter bloom filter membership table with the BloomfilterAST expression and collect | ||
// filtered row group indices | ||
return {collect_filtered_row_group_indices(bloom_filter_membership_table, | ||
bloom_filter_expr.get_bloom_filter_expr(), | ||
input_row_group_indices, | ||
stream), | ||
true}; | ||
return collect_filtered_row_group_indices(bloom_filter_membership_table, | ||
bloom_filter_expr.get_bloom_filter_expr(), | ||
input_row_group_indices, | ||
stream); | ||
} | ||
|
||
equality_literals_collector::equality_literals_collector() = default; | ||
|
||
equality_literals_collector::equality_literals_collector(ast::expression const& expr, | ||
cudf::size_type num_input_columns) | ||
: _num_input_columns{num_input_columns} | ||
{ | ||
_equality_literals.resize(_num_input_columns); | ||
expr.accept(*this); | ||
} | ||
|
||
std::reference_wrapper<ast::expression const> equality_literals_collector::visit( | ||
ast::literal const& expr) | ||
{ | ||
return expr; | ||
} | ||
|
||
std::reference_wrapper<ast::expression const> equality_literals_collector::visit( | ||
ast::column_reference const& expr) | ||
{ | ||
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT, | ||
"BloomfilterAST supports only left table"); | ||
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns, | ||
"Column index cannot be more than number of columns in the table"); | ||
return expr; | ||
} | ||
|
||
std::reference_wrapper<ast::expression const> equality_literals_collector::visit( | ||
ast::column_name_reference const& expr) | ||
{ | ||
CUDF_FAIL("Column name reference is not supported in BloomfilterAST"); | ||
} | ||
|
||
std::reference_wrapper<ast::expression const> equality_literals_collector::visit( | ||
ast::operation const& expr) | ||
{ | ||
using cudf::ast::ast_operator; | ||
auto const operands = expr.get_operands(); | ||
auto const op = expr.get_operator(); | ||
|
||
if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) { | ||
// First operand should be column reference, second should be literal. | ||
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2, | ||
"Only binary operations are supported on column reference"); | ||
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get()); | ||
CUDF_EXPECTS(literal_ptr != nullptr, | ||
"Second operand of binary operation with column reference must be a literal"); | ||
v->accept(*this); | ||
|
||
// Push to the corresponding column's literals list iff equality predicate is seen | ||
if (op == ast_operator::EQUAL) { | ||
auto const col_idx = v->get_column_index(); | ||
_equality_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr)); | ||
} | ||
} else { | ||
// Just visit the operands and ignore any output | ||
std::ignore = visit_operands(operands); | ||
} | ||
|
||
return expr; | ||
} | ||
|
||
std::vector<std::vector<ast::literal*>> equality_literals_collector::get_equality_literals() && | ||
{ | ||
return std::move(_equality_literals); | ||
} | ||
|
||
std::vector<std::reference_wrapper<ast::expression const>> | ||
equality_literals_collector::visit_operands( | ||
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands) | ||
{ | ||
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands; | ||
for (auto const& operand : operands) { | ||
auto const new_operand = operand.get().accept(*this); | ||
transformed_operands.push_back(new_operand); | ||
} | ||
return transformed_operands; | ||
} | ||
|
||
} // namespace cudf::io::parquet::detail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now declared in
reader_impl_helpers.hpp
and defined at the bottom of this file.