Skip to content

Commit 32bdfb0

Browse files
authored
Separate stats filtering helpers to reuse in page pruning (#18034)
Contributes to #17896 This PR separates stats based filtering helpers for reuse in page pruning using stats in Parquet PageIndex. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Karthikeyan (https://github.com/karthikeyann) - Vukasin Milovanovic (https://github.com/vuule) - Vyas Ramasubramani (https://github.com/vyasr) URL: #18034
1 parent 68ae48c commit 32bdfb0

File tree

4 files changed

+416
-294
lines changed

4 files changed

+416
-294
lines changed

cpp/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ add_library(
528528
src/io/parquet/reader_impl_chunking.cu
529529
src/io/parquet/reader_impl_helpers.cpp
530530
src/io/parquet/reader_impl_preprocess.cu
531+
src/io/parquet/stats_filter_helpers.cpp
531532
src/io/parquet/writer_impl.cu
532533
src/io/parquet/writer_impl_helpers.cpp
533534
src/io/parquet/decode_fixed.cu

cpp/src/io/parquet/predicate_pushdown.cpp

+15-294
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616
#include "reader_impl_helpers.hpp"
17+
#include "stats_filter_helpers.hpp"
1718

1819
#include <cudf/ast/detail/expression_transformer.hpp>
1920
#include <cudf/ast/detail/operators.hpp>
2021
#include <cudf/ast/expressions.hpp>
2122
#include <cudf/column/column_factories.hpp>
2223
#include <cudf/detail/iterator.cuh>
2324
#include <cudf/detail/transform.hpp>
24-
#include <cudf/detail/utilities/integer_utils.hpp>
2525
#include <cudf/detail/utilities/vector_factories.hpp>
2626
#include <cudf/utilities/error.hpp>
2727
#include <cudf/utilities/memory_resource.hpp>
@@ -34,97 +34,32 @@
3434
#include <thrust/iterator/counting_iterator.h>
3535

3636
#include <algorithm>
37-
#include <limits>
3837
#include <numeric>
3938
#include <optional>
4039
#include <unordered_set>
4140

4241
namespace cudf::io::parquet::detail {
4342

4443
namespace {
44+
4545
/**
46-
* @brief Converts statistics in column chunks to 2 device columns - min, max values.
46+
* @brief Converts column chunk statistics to 2 device columns - min, max values.
47+
*
48+
* Each column's number of rows equals the total number of row groups.
4749
*
4850
*/
49-
struct stats_caster {
51+
struct row_group_stats_caster : public stats_caster_base {
5052
size_type total_row_groups;
5153
std::vector<metadata> const& per_file_metadata;
5254
host_span<std::vector<size_type> const> row_group_indices;
5355

54-
template <typename ToType, typename FromType>
55-
static ToType targetType(FromType const value)
56-
{
57-
if constexpr (cudf::is_timestamp<ToType>()) {
58-
return static_cast<ToType>(
59-
typename ToType::duration{static_cast<typename ToType::rep>(value)});
60-
} else if constexpr (std::is_same_v<ToType, string_view>) {
61-
return ToType{nullptr, 0};
62-
} else {
63-
return static_cast<ToType>(value);
64-
}
65-
}
66-
67-
// uses storage type as T
68-
template <typename T, CUDF_ENABLE_IF(cudf::is_dictionary<T>() or cudf::is_nested<T>())>
69-
static T convert(uint8_t const* stats_val, size_t stats_size, Type const type)
70-
{
71-
CUDF_FAIL("unsupported type for stats casting");
72-
}
73-
74-
template <typename T, CUDF_ENABLE_IF(cudf::is_boolean<T>())>
75-
static T convert(uint8_t const* stats_val, size_t stats_size, Type const type)
76-
{
77-
CUDF_EXPECTS(type == BOOLEAN, "Invalid type and stats combination");
78-
return targetType<T>(*reinterpret_cast<bool const*>(stats_val));
79-
}
80-
81-
// integral but not boolean, and fixed_point, and chrono.
82-
template <typename T,
83-
CUDF_ENABLE_IF((cudf::is_integral<T>() and !cudf::is_boolean<T>()) or
84-
cudf::is_fixed_point<T>() or cudf::is_chrono<T>())>
85-
static T convert(uint8_t const* stats_val, size_t stats_size, Type const type)
86-
{
87-
switch (type) {
88-
case INT32: return targetType<T>(*reinterpret_cast<int32_t const*>(stats_val));
89-
case INT64: return targetType<T>(*reinterpret_cast<int64_t const*>(stats_val));
90-
case INT96: // Deprecated in parquet specification
91-
return targetType<T>(static_cast<__int128_t>(reinterpret_cast<int64_t const*>(stats_val)[0])
92-
<< 32 |
93-
reinterpret_cast<int32_t const*>(stats_val)[2]);
94-
case BYTE_ARRAY: [[fallthrough]];
95-
case FIXED_LEN_BYTE_ARRAY:
96-
if (stats_size == sizeof(T)) {
97-
// if type size == length of stats_val. then typecast and return.
98-
if constexpr (cudf::is_chrono<T>()) {
99-
return targetType<T>(*reinterpret_cast<typename T::rep const*>(stats_val));
100-
} else {
101-
return targetType<T>(*reinterpret_cast<T const*>(stats_val));
102-
}
103-
}
104-
// unsupported type
105-
default: CUDF_FAIL("Invalid type and stats combination");
106-
}
107-
}
108-
109-
template <typename T, CUDF_ENABLE_IF(cudf::is_floating_point<T>())>
110-
static T convert(uint8_t const* stats_val, size_t stats_size, Type const type)
111-
{
112-
switch (type) {
113-
case FLOAT: return targetType<T>(*reinterpret_cast<float const*>(stats_val));
114-
case DOUBLE: return targetType<T>(*reinterpret_cast<double const*>(stats_val));
115-
default: CUDF_FAIL("Invalid type and stats combination");
116-
}
117-
}
118-
119-
template <typename T, CUDF_ENABLE_IF(std::is_same_v<T, string_view>)>
120-
static T convert(uint8_t const* stats_val, size_t stats_size, Type const type)
56+
row_group_stats_caster(size_type total_row_groups,
57+
std::vector<metadata> const& per_file_metadata,
58+
host_span<std::vector<size_type> const> row_group_indices)
59+
: total_row_groups{total_row_groups},
60+
per_file_metadata{per_file_metadata},
61+
row_group_indices{row_group_indices}
12162
{
122-
switch (type) {
123-
case BYTE_ARRAY: [[fallthrough]];
124-
case FIXED_LEN_BYTE_ARRAY:
125-
return string_view(reinterpret_cast<char const*>(stats_val), stats_size);
126-
default: CUDF_FAIL("Invalid type and stats combination");
127-
}
12863
}
12964

13065
// Creates device columns from column statistics (min, max)
@@ -139,82 +74,8 @@ struct stats_caster {
13974
if constexpr (cudf::is_compound<T>() && !std::is_same_v<T, string_view>) {
14075
CUDF_FAIL("Compound types do not have statistics");
14176
} else {
142-
// Local struct to hold host columns
143-
struct host_column {
144-
// using thrust::host_vector because std::vector<bool> uses bitmap instead of byte per bool.
145-
cudf::detail::host_vector<T> val;
146-
std::vector<bitmask_type> null_mask;
147-
cudf::size_type null_count = 0;
148-
host_column(size_type total_row_groups, rmm::cuda_stream_view stream)
149-
: val{cudf::detail::make_host_vector<T>(total_row_groups, stream)},
150-
null_mask(
151-
cudf::util::div_rounding_up_safe<size_type>(
152-
cudf::bitmask_allocation_size_bytes(total_row_groups), sizeof(bitmask_type)),
153-
~bitmask_type{0})
154-
{
155-
}
156-
157-
void set_index(size_type index,
158-
std::optional<std::vector<uint8_t>> const& binary_value,
159-
Type const type)
160-
{
161-
if (binary_value.has_value()) {
162-
val[index] = convert<T>(binary_value.value().data(), binary_value.value().size(), type);
163-
}
164-
if (not binary_value.has_value()) {
165-
clear_bit_unsafe(null_mask.data(), index);
166-
null_count++;
167-
}
168-
}
169-
170-
static auto make_strings_children(host_span<string_view> host_strings,
171-
rmm::cuda_stream_view stream,
172-
rmm::device_async_resource_ref mr)
173-
{
174-
auto const total_char_count = std::accumulate(
175-
host_strings.begin(), host_strings.end(), 0, [](auto sum, auto const& str) {
176-
return sum + str.size_bytes();
177-
});
178-
auto chars = cudf::detail::make_empty_host_vector<char>(total_char_count, stream);
179-
auto offsets =
180-
cudf::detail::make_empty_host_vector<cudf::size_type>(host_strings.size() + 1, stream);
181-
offsets.push_back(0);
182-
for (auto const& str : host_strings) {
183-
auto tmp =
184-
str.empty() ? std::string_view{} : std::string_view(str.data(), str.size_bytes());
185-
chars.insert(chars.end(), std::cbegin(tmp), std::cend(tmp));
186-
offsets.push_back(offsets.back() + tmp.length());
187-
}
188-
auto d_chars = cudf::detail::make_device_uvector_async(chars, stream, mr);
189-
auto d_offsets = cudf::detail::make_device_uvector_sync(offsets, stream, mr);
190-
return std::tuple{std::move(d_chars), std::move(d_offsets)};
191-
}
192-
193-
auto to_device(cudf::data_type dtype,
194-
rmm::cuda_stream_view stream,
195-
rmm::device_async_resource_ref mr)
196-
{
197-
if constexpr (std::is_same_v<T, string_view>) {
198-
auto [d_chars, d_offsets] = make_strings_children(val, stream, mr);
199-
return cudf::make_strings_column(
200-
val.size(),
201-
std::make_unique<column>(std::move(d_offsets), rmm::device_buffer{}, 0),
202-
d_chars.release(),
203-
null_count,
204-
rmm::device_buffer{
205-
null_mask.data(), cudf::bitmask_allocation_size_bytes(val.size()), stream, mr});
206-
}
207-
return std::make_unique<column>(
208-
dtype,
209-
val.size(),
210-
cudf::detail::make_device_uvector_async(val, stream, mr).release(),
211-
rmm::device_buffer{
212-
null_mask.data(), cudf::bitmask_allocation_size_bytes(val.size()), stream, mr},
213-
null_count);
214-
}
215-
}; // local struct host_column
216-
host_column min(total_row_groups, stream);
217-
host_column max(total_row_groups, stream);
77+
host_column<T> min(total_row_groups, stream);
78+
host_column<T> max(total_row_groups, stream);
21879
size_type stats_idx = 0;
21980
for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
22081
for (auto const rg_idx : row_group_indices[src_idx]) {
@@ -248,146 +109,6 @@ struct stats_caster {
248109
}
249110
};
250111

251-
/**
252-
* @brief Converts AST expression to StatsAST for comparing with column statistics
253-
* This is used in row group filtering based on predicate.
254-
* statistics min value of a column is referenced by column_index*2
255-
* statistics max value of a column is referenced by column_index*2+1
256-
*
257-
*/
258-
class stats_expression_converter : public ast::detail::expression_transformer {
259-
public:
260-
stats_expression_converter(ast::expression const& expr, size_type const& num_columns)
261-
: _num_columns{num_columns}
262-
{
263-
expr.accept(*this);
264-
}
265-
266-
/**
267-
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& )
268-
*/
269-
std::reference_wrapper<ast::expression const> visit(ast::literal const& expr) override
270-
{
271-
return expr;
272-
}
273-
274-
/**
275-
* @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& )
276-
*/
277-
std::reference_wrapper<ast::expression const> visit(ast::column_reference const& expr) override
278-
{
279-
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT,
280-
"Statistics AST supports only left table");
281-
CUDF_EXPECTS(expr.get_column_index() < _num_columns,
282-
"Column index cannot be more than number of columns in the table");
283-
return expr;
284-
}
285-
286-
/**
287-
* @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& )
288-
*/
289-
std::reference_wrapper<ast::expression const> visit(
290-
ast::column_name_reference const& expr) override
291-
{
292-
CUDF_FAIL("Column name reference is not supported in statistics AST");
293-
}
294-
295-
/**
296-
* @copydoc ast::detail::expression_transformer::visit(ast::operation const& )
297-
*/
298-
std::reference_wrapper<ast::expression const> visit(ast::operation const& expr) override
299-
{
300-
using cudf::ast::ast_operator;
301-
auto const operands = expr.get_operands();
302-
auto const op = expr.get_operator();
303-
304-
if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) {
305-
// First operand should be column reference, second should be literal.
306-
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2,
307-
"Only binary operations are supported on column reference");
308-
CUDF_EXPECTS(dynamic_cast<ast::literal const*>(&operands[1].get()) != nullptr,
309-
"Second operand of binary operation with column reference must be a literal");
310-
v->accept(*this);
311-
// Push literal into the ast::tree
312-
auto const& literal =
313-
_stats_expr.push(*dynamic_cast<ast::literal const*>(&operands[1].get()));
314-
auto const col_index = v->get_column_index();
315-
switch (op) {
316-
/* transform to stats conditions. op(col, literal)
317-
col1 == val --> vmin <= val && vmax >= val
318-
col1 != val --> !(vmin == val && vmax == val)
319-
col1 > val --> vmax > val
320-
col1 < val --> vmin < val
321-
col1 >= val --> vmax >= val
322-
col1 <= val --> vmin <= val
323-
*/
324-
case ast_operator::EQUAL: {
325-
auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2});
326-
auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1});
327-
_stats_expr.push(ast::operation{
328-
ast::ast_operator::LOGICAL_AND,
329-
_stats_expr.push(ast::operation{ast_operator::GREATER_EQUAL, vmax, literal}),
330-
_stats_expr.push(ast::operation{ast_operator::LESS_EQUAL, vmin, literal})});
331-
break;
332-
}
333-
case ast_operator::NOT_EQUAL: {
334-
auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2});
335-
auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1});
336-
_stats_expr.push(ast::operation{
337-
ast_operator::LOGICAL_OR,
338-
_stats_expr.push(ast::operation{ast_operator::NOT_EQUAL, vmin, vmax}),
339-
_stats_expr.push(ast::operation{ast_operator::NOT_EQUAL, vmax, literal})});
340-
break;
341-
}
342-
case ast_operator::LESS: [[fallthrough]];
343-
case ast_operator::LESS_EQUAL: {
344-
auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2});
345-
_stats_expr.push(ast::operation{op, vmin, literal});
346-
break;
347-
}
348-
case ast_operator::GREATER: [[fallthrough]];
349-
case ast_operator::GREATER_EQUAL: {
350-
auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1});
351-
_stats_expr.push(ast::operation{op, vmax, literal});
352-
break;
353-
}
354-
default: CUDF_FAIL("Unsupported operation in Statistics AST");
355-
};
356-
} else {
357-
auto new_operands = visit_operands(operands);
358-
if (cudf::ast::detail::ast_operator_arity(op) == 2) {
359-
_stats_expr.push(ast::operation{op, new_operands.front(), new_operands.back()});
360-
} else if (cudf::ast::detail::ast_operator_arity(op) == 1) {
361-
_stats_expr.push(ast::operation{op, new_operands.front()});
362-
}
363-
}
364-
return _stats_expr.back();
365-
}
366-
367-
/**
368-
* @brief Returns the AST to apply on Column chunk statistics.
369-
*
370-
* @return AST operation expression
371-
*/
372-
[[nodiscard]] std::reference_wrapper<ast::expression const> get_stats_expr() const
373-
{
374-
return _stats_expr.back();
375-
}
376-
377-
private:
378-
std::vector<std::reference_wrapper<ast::expression const>> visit_operands(
379-
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands)
380-
{
381-
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands;
382-
for (auto const& operand : operands) {
383-
auto const new_operand = operand.get().accept(*this);
384-
transformed_operands.push_back(new_operand);
385-
}
386-
return transformed_operands;
387-
}
388-
ast::tree _stats_expr;
389-
size_type _num_columns;
390-
};
391112
} // namespace
392113

393114
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_stats_filters(
@@ -404,7 +125,7 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
404125
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1]
405126
// For each column, it contains #sources * #column_chunks_per_src rows.
406127
std::vector<std::unique_ptr<column>> columns;
407-
stats_caster const stats_col{
128+
row_group_stats_caster const stats_col{
408129
static_cast<size_type>(total_row_groups), per_file_metadata, input_row_group_indices};
409130
for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
410131
auto const schema_idx = output_column_schemas[col_idx];

0 commit comments

Comments
 (0)