diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index d3e60158c91..488c8de3ca7 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -930,6 +930,8 @@ constexpr bool is_split_decode() * @param num_rows Maximum number of rows to read * @param page_mask Boolean vector indicating which pages need to be decoded * @param initial_str_offsets Vector to store the initial offsets for large nested string cols + * @param page_string_offset_indices Device span of offsets, indexed per-page, into the column's + * string offset buffer * @param error_code Error code to set if an error is encountered */ template @@ -940,6 +942,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) size_t num_rows, cudf::device_span page_mask, cudf::device_span initial_str_offsets, + cudf::device_span page_string_offset_indices, kernel_error::pointer error_code) { constexpr bool has_dict_t = has_dict(); @@ -954,10 +957,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); __shared__ __align__(16) page_state_s state_g; - constexpr bool use_dict_buffers = has_dict_t || has_bools_t || has_strings_t; + constexpr bool use_dict_buffers = has_dict_t || has_bools_t; using state_buf_t = page_state_buffers_s; + 1>; __shared__ __align__(16) state_buf_t state_buffers; auto const block = cg::this_thread_block(); @@ -1083,6 +1086,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) int valid_count = 0; size_t string_output_offset = 0; int const init_valid_map_offset = s->nesting_info[s->col.max_nesting_depth - 1].valid_map_offset; + uint32_t* const str_offsets = + s->col.column_string_offset_base + page_string_offset_indices[page_idx]; // Skip ahead in the decoding so that we don't repeat work (skipped_leaf_values = 0 for non-lists) auto const skipped_leaf_values = s->page.skipped_leaf_values; @@ -1094,10 +1099,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) processed_count = skip_decode(rep_decoder, skipped_leaf_values, t); if constexpr (has_dict_t) { skip_decode(dict_stream, skipped_leaf_values, t); - } else if constexpr (has_strings_t) { - initialize_string_descriptors(s, sb, skipped_leaf_values, block); - if (t == 0) { s->dict_pos = processed_count; } - block.sync(); } else if constexpr (has_bools_t) { if (bools_are_rle_stream) { skip_decode(bool_stream, skipped_leaf_values, t); @@ -1165,11 +1166,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) if constexpr (has_dict_t) { dict_stream.decode_next(t, next_valid_count - valid_count); block.sync(); - } else if constexpr (has_strings_t) { - auto const target_pos = next_valid_count + skipped_leaf_values; - initialize_string_descriptors(s, sb, target_pos, block); - if (t == 0) { s->dict_pos = target_pos; } - block.sync(); } else if constexpr (has_bools_t) { if (bools_are_rle_stream) { bool_stream.decode_next(t, next_valid_count - valid_count); @@ -1184,8 +1180,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) auto decode_values = [&]() { if constexpr (has_strings_t) { string_output_offset = - decode_strings( - s, sb, valid_count, next_valid_count, t, string_output_offset); + decode_strings( + s, sb, valid_count, next_valid_count, t, str_offsets, string_output_offset); } else if constexpr (split_decode_t) { decode_fixed_width_split_values( s, sb, valid_count, next_valid_count, t); @@ -1269,6 +1265,7 @@ void decode_page_data(cudf::detail::hostdevice_span pages, decode_kernel_mask kernel_mask, cudf::device_span page_mask, cudf::device_span initial_str_offsets, + cudf::device_span page_string_offset_indices, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { @@ -1288,6 +1285,7 @@ void decode_page_data(cudf::detail::hostdevice_span pages, num_rows, page_mask, initial_str_offsets, + page_string_offset_indices, error_code); } else { decode_page_data_generic @@ -1297,6 +1295,7 @@ void decode_page_data(cudf::detail::hostdevice_span pages, num_rows, page_mask, initial_str_offsets, + page_string_offset_indices, error_code); } }; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 0c9f3493736..92268224605 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1101,4 +1101,331 @@ void compute_page_string_sizes_pass2(cudf::detail::hostdevice_span pag } } +/** + * @brief Helper function to prefetch next chunk of data into shared memory using multiple threads + * + * @param t Thread index within block + * @param next_length_offset Current offset in the data stream + * @param dict_size Total size of the dictionary/data + * @param cur Pointer to the start of the data + * @param prefetch_buffer Shared memory buffer for prefetching + * @param[in,out] buffer_base Offset corresponding to the start of the prefetched buffer + * @param[in,out] buffer_end Offset corresponding to the end of the prefetched buffer + * @return Whether the buffer contains valid data + */ +template +inline __device__ bool prefetch_string_data(int t, + int32_t next_length_offset, + int32_t dict_size, + uint8_t const* cur, + uint8_t* prefetch_buffer, + int32_t& buffer_base, + int32_t& buffer_end) +{ + // The start of the prefetched buffer is the offset to the next length in the data stream + buffer_base = next_length_offset; + + int32_t const total_bytes_to_copy = cuda::std::min(prefetch_size, dict_size - buffer_base); + if (total_bytes_to_copy <= 0) { return false; } // No data left to copy + buffer_end = buffer_base + total_bytes_to_copy; + + // Nominally, each thread will copy an equal number of bytes; this rounds up. + auto const nominal_thread_bytes_to_copy = + cudf::util::div_rounding_up_unsafe(total_bytes_to_copy, block_size); + int32_t const thread_offset = nominal_thread_bytes_to_copy * t; + + if (thread_offset < total_bytes_to_copy) { + // Guard against the end of the data stream + int32_t const thread_bytes_to_copy = + cuda::std::min(nominal_thread_bytes_to_copy, total_bytes_to_copy - thread_offset); + + if (thread_bytes_to_copy > 0) { + int32_t const thread_copy_from_index = buffer_base + thread_offset; + cuda::std::memcpy(reinterpret_cast(&prefetch_buffer[thread_offset]), + reinterpret_cast(&cur[thread_copy_from_index]), + thread_bytes_to_copy); + } + } + return true; +} + +/** + * @brief Read string offsets with buffering and prefetching + * + * This function uses a prefetch buffer to efficiently process string offsets by + * reading data in chunks. Thread 0 reads string lengths, and all threads cooperate + * on prefetching data and filling the remaining entries. + * + * @param s Page state containing data_start, dict_size and other info + * @param num_values_to_skip Number of values to skip before processing + * @param num_values_to_process Number of values to process + * @param str_offsets Output buffer for string offsets + */ +template +inline __device__ void read_string_offsets_buffered(page_state_s* s, + size_t num_values_to_skip, + size_t num_values_to_process, + uint32_t* str_offsets) +{ + auto const block = cg::this_thread_block(); + int const t = block.thread_rank(); + uint8_t const* cur = s->data_start; + auto const dict_size = s->dict_size; + + int32_t buffer_base = 0; + int32_t buffer_end = 0; + int32_t next_length_offset = 0; + __shared__ __align__(128) uint8_t prefetch_buffer[prefetch_size]; + + auto process_data = [&](size_t loop_end) { + // Parquet data is: 4-byte length, string, 4-byte length, string, ... + for (size_t pos = 0; pos < loop_end; pos++) { + int32_t const string_offset = next_length_offset + sizeof(int32_t); + if (string_offset > dict_size) { return pos; } // Can't read any more valid data + + // Check if we need to prefetch more data + if ((next_length_offset + sizeof(int32_t)) > buffer_end) { + block.sync(); // Make sure all of the threads have finished reading the previous data + if (!prefetch_string_data( + t, next_length_offset, dict_size, cur, prefetch_buffer, buffer_base, buffer_end)) { + return pos; // End of the data + } + block.sync(); // Sync all of the prefetched data for all of the threads + } + + // Read the length of the string from the prefetched buffer + int32_t const prefetch_read_index = next_length_offset - buffer_base; + int32_t len; + cuda::std::memcpy(reinterpret_cast(&len), + reinterpret_cast(&prefetch_buffer[prefetch_read_index]), + sizeof(int32_t)); + + if (string_offset + len > dict_size) { return pos; } // Data is corrupted or incomplete + next_length_offset = string_offset + len; + + if constexpr (save_offset) { + if (t == 0) { str_offsets[pos] = string_offset; } + } + } + return loop_end; + }; + + // Initial prefetch + if (!prefetch_string_data( + t, next_length_offset, dict_size, cur, prefetch_buffer, buffer_base, buffer_end)) { + return; // No data to process + } + block.sync(); // Sync all of the prefetched data for all of the threads + + // Skip to the first value we need to process + int32_t const skip_pos = process_data.template operator()(num_values_to_skip); + + // Process the values we need - only if we successfully skipped past all the values we needed to + size_t const num_values_written = + (skip_pos != num_values_to_skip) + ? 0 + : process_data.template operator()(num_values_to_process); + + // +4 for "stored" length of "next" string that we'll subtract off during decode + // Easier/faster than branching in the decode loop + int32_t const last_string_offset = next_length_offset + sizeof(int32_t); + + // Use all threads in the block to fill remaining entries with the last offset + // This fills in the rest if we break early above because the dictionary wasn't large enough + // But it also fills in the last offset for the page, hence the +1 in the loop limit. + for (size_t pos = num_values_written + t; pos < num_values_to_process + 1; pos += block_size) { + str_offsets[pos] = last_string_offset; + } +} + +/** + * @brief Read string offsets without buffering for large average string lengths + * + * This function handles the case where the average string length is too large + * to use the buffering approach. Thread 0 sequentially reads all string lengths, + * then all threads cooperatively fill the remaining entries. + * + * @param s Page state containing data_start, dict_size and other info + * @param num_values_to_skip Number of values to skip before processing + * @param num_values_to_process Number of values to process + * @param str_offsets Output buffer for string offsets + */ +template +inline __device__ void read_string_offsets_sequential(page_state_s* s, + size_t num_values_to_skip, + size_t num_values_to_process, + uint32_t* str_offsets) +{ + auto const block = cg::this_thread_block(); + int const t = block.thread_rank(); + + __shared__ size_t num_values_written; + __shared__ uint32_t last_offset; // offset into data_start + + if (t == 0) { + uint8_t const* cur = s->data_start; + uint32_t length_offset = 0; + auto const dict_size = s->dict_size; + + auto process_loop = [&](size_t loop_end) { + // Parquet data is: 4-byte length, string, 4-byte length, string, ... + for (size_t pos = 0; pos < loop_end; pos++) { + uint32_t const string_offset = length_offset + sizeof(int32_t); + if (string_offset > dict_size) { return pos; } // Can't read any more valid data + + // Read the length of the string from the data stream + int32_t len; + cuda::std::memcpy(reinterpret_cast(&len), + reinterpret_cast(&cur[length_offset]), + sizeof(int32_t)); + + if (string_offset + len > dict_size) { return pos; } // Data is corrupted or incomplete + if constexpr (save_offsets) { str_offsets[pos] = string_offset; } + length_offset = string_offset + len; + } + return loop_end; + }; + + // Skip to the first value we need to process, then process the values we need + size_t const skip_pos = process_loop.template operator()(num_values_to_skip); + if (skip_pos == num_values_to_skip) { + num_values_written = process_loop.template operator()(num_values_to_process); + } else { + num_values_written = 0; // Skipped past all the values in the page + } + + // +4 for "stored" length of "next" string that we'll subtract off during decode + last_offset = length_offset + sizeof(int32_t); + } + + block.sync(); // Ensure all threads see num_values_written + + // Use all threads in the block to fill remaining entries with the last offset + // This fills in the rest if we break early above because the dictionary wasn't large enough + // But it also fills in the last offset for the page, hence the +1 in the loop limit. + for (size_t pos = num_values_written + t; pos < num_values_to_process + 1; pos += block_size) { + str_offsets[pos] = last_offset; + } +} + +/** + * @brief Pre-processing kernel to fill string offsets for non-dictionary string columns + * + * This kernel runs before the main decode kernel to pre-compute string offsets + * for columns that use plain encoding without dictionaries. + * + * @param pages List of pages + * @param chunks List of column chunks + * @param page_string_offset_indices Device span of offsets, indexed per-page, into the column's + * string offset buffer + * @param page_mask Boolean vector indicating which pages need to be processed + * @param min_row Minimum row index to read + * @param num_rows Number of rows to read starting from min_row + */ +template +CUDF_KERNEL void preprocess_string_offsets_kernel( + PageInfo* pages, + device_span chunks, + device_span page_string_offset_indices, + cudf::device_span page_mask, + size_t min_row, + size_t num_rows) +{ + int const page_idx = cg::this_grid().block_rank(); + PageInfo* const pp = &pages[page_idx]; + + // Don't process pages that don't need to be decoded + if (not page_mask.empty() and not page_mask[page_idx]) { return; } + + auto const& chunk = chunks[pp->chunk_idx]; + if (chunk.physical_type == Type::FIXED_LEN_BYTE_ARRAY) { + return; // String lengths, and thus offsets are fixed, no need to preprocess + } + + // Check if this is a string column without dictionary using kernel mask + constexpr uint32_t STRINGS_MASK_NON_DELTA_NON_DICT = + BitOr(decode_kernel_mask::STRING, + decode_kernel_mask::STRING_NESTED, + decode_kernel_mask::STRING_LIST, + decode_kernel_mask::STRING_STREAM_SPLIT, + decode_kernel_mask::STRING_STREAM_SPLIT_NESTED, + decode_kernel_mask::STRING_STREAM_SPLIT_LIST); + + __shared__ __align__(16) page_state_s state_g; + page_state_s* const s = &state_g; + if (!setup_local_page_info(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{STRINGS_MASK_NON_DELTA_NON_DICT}, + page_processing_stage::DECODE)) { + return; + } + + // Determine if this is a list column and how many values to process + // For non-lists, we don't know how many values we'll need to read, because we don't know + // how many nulls we'll skip. So we have to read through the skipped rows. + bool const is_list = (chunk.max_level[level_type::REPETITION] != 0); + size_t const num_values_to_skip = is_list ? pp->skipped_leaf_values : 0; + size_t const num_values_to_process = + is_list ? pp->nesting[chunk.max_nesting_depth - 1].batch_size : s->num_rows + s->first_row; + + if (num_values_to_process == 0) { return; } + + uint32_t* const str_offsets = + chunk.column_string_offset_base + page_string_offset_indices[page_idx]; + + // If the average string length is small, all of the threads in the warp can iteratively prefetch + // blocks of data containing many string lengths into shared memory. + // However, if the average string length is large, we'll spend too much time copying raw string + // data we don't need: have a single thread read the string lengths sequentially. + constexpr int max_avg_string_length_for_buffer = prefetch_size / 16; // for 1024 buffer, is 64 + auto const avg_string_length = s->dict_size / pp->num_input_values - sizeof(int32_t); + + if (avg_string_length > max_avg_string_length_for_buffer) { + // Use sequential processing for large average string lengths + read_string_offsets_sequential( + s, num_values_to_skip, num_values_to_process, str_offsets); + } else { + // Use buffered processing for typical string lengths + read_string_offsets_buffered( + s, num_values_to_skip, num_values_to_process, str_offsets); + } +} + +/** + * @brief Launches the pre-processing kernel to fill string offsets for non-dictionary, non-FLBA + * string columns + * + * @param pages List of pages + * @param chunks List of column chunks + * @param page_string_offset_indices Device span of offsets, indexed per-page, into the column's + * string offset buffer + * @param page_mask Boolean vector indicating which pages need to be processed + * @param min_row Minimum row index to read + * @param num_rows Number of rows to read starting from min_row + * @param stream CUDA stream to use + */ +void preprocess_string_offsets(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + cudf::device_span page_string_offset_indices, + cudf::device_span page_mask, + size_t min_row, + size_t num_rows, + rmm::cuda_stream_view stream) +{ + if (pages.size() == 0) { return; } + + constexpr int preprocess_block_size = cudf::detail::warp_size; + constexpr int prefetch_size = 1024; + + dim3 dim_block(preprocess_block_size, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + preprocess_string_offsets_kernel + <<>>( + pages.device_ptr(), chunks, page_string_offset_indices, page_mask, min_row, num_rows); +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_string_utils.cuh b/cpp/src/io/parquet/page_string_utils.cuh index db72e414cba..98209a9b09a 100644 --- a/cpp/src/io/parquet/page_string_utils.cuh +++ b/cpp/src/io/parquet/page_string_utils.cuh @@ -234,12 +234,18 @@ __device__ inline int calc_threads_per_string_log2(int avg_string_length) // re * @param string_output_offset Starting offset into the output column data for writing */ template -__device__ size_t decode_strings( - page_state_s* s, state_buf* const sb, int start, int end, int t, size_t string_output_offset) +__device__ size_t decode_strings(page_state_s* s, + state_buf* const sb, + int start, + int end, + int t, + uint32_t* str_offsets, + size_t string_output_offset) { // nesting level that is storing actual leaf values int const leaf_level_index = s->col.max_nesting_depth - 1; @@ -276,18 +282,43 @@ __device__ size_t decode_strings( }(); // lookup input string pointer & length. store length. - bool const in_range = (thread_pos < target_pos) && (dst_pos >= 0); - auto [thread_input_string, string_length] = [&]() { + bool const in_range = (thread_pos < target_pos) && (dst_pos >= 0); + auto const [thread_input_string, string_length] = [&]() { // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. if (!in_range) { return string_index_pair{nullptr, 0}; } - string_index_pair string_pair = gpuGetStringData(s, sb, src_pos); - int32_t* str_len_ptr = reinterpret_cast(ni.data_out) + dst_pos; - *str_len_ptr = string_pair.second; - return string_pair; + if constexpr (has_dict_t) { + return gpuGetStringData(s, sb, src_pos); + } else { + int input_thread_string_offset; + int string_length; + if (s->col.physical_type == Type::FIXED_LEN_BYTE_ARRAY) { + input_thread_string_offset = (thread_pos + skipped_leaf_values) * s->dtype_len_in; + string_length = s->dtype_len_in; + } else { + input_thread_string_offset = str_offsets[thread_pos]; + int const next_offset = str_offsets[thread_pos + 1]; + // The memory is laid out as: 4-byte length, string, 4-byte length, string, ... + // String length = subtract the offsets and the stored length of the next string + // Except at the end of the dictionary, where the last string offset is repeated. + string_length = (next_offset == input_thread_string_offset) + ? 0 + : next_offset - input_thread_string_offset - sizeof(int32_t); + } + if (input_thread_string_offset >= static_cast(s->dict_size)) { + return string_index_pair{nullptr, 0}; + } + auto const thread_input_string = + reinterpret_cast(s->data_start + input_thread_string_offset); + return string_index_pair{thread_input_string, string_length}; + } }(); + if (in_range) { + int32_t* str_len_ptr = reinterpret_cast(ni.data_out) + dst_pos; + *str_len_ptr = string_length; + } - // compute string offsets + // compute output string offsets size_t thread_string_offset, block_total_string_length; { using scanner = cub::BlockScan; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 20c9de97106..0246479334a 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -460,6 +460,7 @@ struct ColumnChunkDesc { bitmask_type** valid_map_base{}; // base pointers of valid bit map for this column void** column_data_base{}; // base pointers of column data void** column_string_base{}; // base pointers of column string data + uint32_t* column_string_offset_base{}; // base pointer of column string offset data Compression codec{}; // compressed codec enum std::optional logical_type{}; // logical type int32_t ts_clock_rate{}; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) @@ -932,6 +933,28 @@ void decode_delta_length_byte_array(cudf::detail::hostdevice_span page kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches pre-processing kernel to fill string offsets for non-dictionary columns + * + * This kernel runs before the main decode kernel to pre-compute string offsets + * for columns that use plain encoding without dictionaries. + * + * @param[in,out] pages All pages to be processed + * @param[in] chunks All chunks to be processed + * @param[in] page_string_offset_indices Device span of page string offset indices + * @param[in] page_mask Boolean vector indicating which pages need to be processed + * @param[in] min_row Minimum row index to read + * @param[in] num_rows Number of rows to read starting from min_row + * @param[in] stream CUDA stream to use + */ +void preprocess_string_offsets(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + cudf::device_span page_string_offset_indices, + cudf::device_span page_mask, + size_t min_row, + size_t num_rows, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for reading non-dictionary fixed width column data stored in the pages * @@ -946,6 +969,7 @@ void decode_delta_length_byte_array(cudf::detail::hostdevice_span page * @param[in] kernel_mask Mask indicating the type of decoding kernel to launch. * @param[in] page_mask Boolean vector indicating which pages need to be decoded * @param[out] initial_str_offsets Vector to store the initial offsets for large nested string cols + * @param[in] page_string_offset_indices Device span of page string offset indices * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -957,6 +981,7 @@ void decode_page_data(cudf::detail::hostdevice_span pages, decode_kernel_mask kernel_mask, cudf::device_span page_mask, cudf::device_span initial_str_offsets, + cudf::device_span page_string_offset_indices, kernel_error::pointer error_code, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 470420a6775..1256667c5c3 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -189,6 +189,10 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_ initial_str_offsets = cudf::detail::make_device_uvector_async(host_offsets_vector, _stream, _mr); chunk_nested_str_data.host_to_device_async(_stream); + + // Allocate string offset buffers and get string offsets for non-dictionary, non-FLBA string + // columns + compute_page_string_offset_indices(skip_rows, num_rows); } // create this before we fork streams @@ -209,6 +213,7 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_ decoder_mask, _subpass_page_mask, initial_str_offsets, + _page_string_offset_indices, error_code.data(), streams[s_idx++]); }; @@ -468,12 +473,18 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_ } } + // Clear string offset buffers to free device memory + _page_string_offset_indices.resize(0, _stream); + _string_offset_buffer.resize(0, _stream); + _stream.synchronize(); } reader_impl::reader_impl() : _options{}, - _subpass_page_mask{cudf::detail::hostdevice_vector(0, cudf::get_default_stream())} + _subpass_page_mask{cudf::detail::hostdevice_vector(0, cudf::get_default_stream())}, + _string_offset_buffer{0, cudf::get_default_stream()}, + _page_string_offset_indices{0, cudf::get_default_stream()} { } @@ -507,6 +518,8 @@ reader_impl::reader_impl(std::size_t chunk_read_limit, options.is_enabled_use_jit_filter()}, _sources{std::move(sources)}, _subpass_page_mask{cudf::detail::hostdevice_vector(0, _stream)}, + _string_offset_buffer{0, _stream}, + _page_string_offset_indices{0, _stream}, _output_chunk_read_limit{chunk_read_limit}, _input_pass_read_limit{pass_read_limit} { diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 0532487fb2d..4c03cdba19a 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -249,6 +249,19 @@ class reader_impl { */ void preprocess_subpass_pages(read_mode mode, size_t chunk_read_limit); + /** + * @brief Set page string offset indices for non-dictionary, non-FLBA string columns. + * + * This function calculates the string offset index for each page of non-dictionary, non-FLBA + * string columns and populates the _page_string_offset_indices member variable. + * The indices are used by decode kernels to access pre-computed string offsets. + * + * @param skip_rows The number of rows to skip in this subpass + * @param num_rows The number of rows to read in this subpass + * @param page_mask The page mask for this subpass + */ + void compute_page_string_offset_indices(size_t skip_rows, size_t num_rows); + /** * @brief Allocate nesting information storage for all pages and set pointers to it. * @@ -441,6 +454,14 @@ class reader_impl { // Page mask for filtering out subpass data pages (Copied to the device) cudf::detail::hostdevice_vector _subpass_page_mask; + // String offset buffer for non-dictionary, non-FLBA string columns + // Contains pre-computed offsets into the string data + rmm::device_uvector _string_offset_buffer; + + // For each page, the index into the column's string offset buffer + // Used for non-dictionary, non-FLBA string columns + rmm::device_uvector _page_string_offset_indices; + // _output_buffers associated metadata std::unique_ptr _output_metadata; diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 45add882036..eabef9c4e7e 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -264,13 +264,15 @@ void reader_impl::setup_next_subpass(read_mode mode) cuda::std::equal_to{}, cumulative_page_sum{}); - // include scratch space needed for decompression. for certain codecs (eg ZSTD) this - // can be considerable. + // include scratch space needed for decompression and string offset buffers. + // for certain codecs (eg ZSTD) this an be considerable. if (is_first_subpass) { pass.decomp_scratch_sizes = compute_decompression_scratch_sizes(pass.chunks, pass.pages, _stream); + pass.string_offset_sizes = compute_string_offset_sizes(pass.chunks, pass.pages, _stream); } - include_decompression_scratch_size(pass.decomp_scratch_sizes, c_info, _stream); + include_scratch_size(pass.decomp_scratch_sizes, c_info, _stream); + include_scratch_size(pass.string_offset_sizes, c_info, _stream); auto iter = thrust::make_counting_iterator(0); auto const pass_max_row = pass.skip_rows + pass.num_rows; diff --git a/cpp/src/io/parquet/reader_impl_chunking.hpp b/cpp/src/io/parquet/reader_impl_chunking.hpp index 8b4f0a7cee1..ae59d99eb7d 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.hpp +++ b/cpp/src/io/parquet/reader_impl_chunking.hpp @@ -150,6 +150,7 @@ struct pass_intermediate_data { rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()}; rmm::device_uvector decomp_scratch_sizes{0, cudf::get_default_stream()}; + rmm::device_uvector string_offset_sizes{0, cudf::get_default_stream()}; rmm::device_uvector str_dict_index{0, cudf::get_default_stream()}; int level_type_size{0}; diff --git a/cpp/src/io/parquet/reader_impl_chunking_utils.cu b/cpp/src/io/parquet/reader_impl_chunking_utils.cu index 8076c32a520..ccd1de2f892 100644 --- a/cpp/src/io/parquet/reader_impl_chunking_utils.cu +++ b/cpp/src/io/parquet/reader_impl_chunking_utils.cu @@ -801,9 +801,9 @@ rmm::device_uvector compute_decompression_scratch_sizes( return d_temp_cost; } -void include_decompression_scratch_size(device_span temp_cost, - device_span c_info, - rmm::cuda_stream_view stream) +void include_scratch_size(device_span temp_cost, + device_span c_info, + rmm::cuda_stream_view stream) { auto iter = thrust::make_counting_iterator(size_t{0}); thrust::for_each(rmm::exec_policy_nosync(stream), @@ -814,4 +814,66 @@ void include_decompression_scratch_size(device_span temp_cost, }); } +namespace { + +/** + * @brief Functor to compute the string offset buffer size needed for a page. + * + * This computes the memory needed to store string offsets (uint32_t per value) + * for non-dictionary, non-FLBA string pages. + */ +struct compute_page_string_offset_size { + device_span pages; + device_span chunks; + + __device__ size_t operator()(size_t page_idx) const + { + // Mask for non-dictionary, non-delta string columns (same as used in decode kernel) + constexpr uint32_t STRINGS_MASK_NON_DELTA_NON_DICT = + BitOr(decode_kernel_mask::STRING, + decode_kernel_mask::STRING_NESTED, + decode_kernel_mask::STRING_LIST, + decode_kernel_mask::STRING_STREAM_SPLIT, + decode_kernel_mask::STRING_STREAM_SPLIT_NESTED, + decode_kernel_mask::STRING_STREAM_SPLIT_LIST); + + auto const& page = pages[page_idx]; + auto const& chunk = chunks[page.chunk_idx]; + + // Check if this page is a non-dictionary string page using kernel mask + if (BitAnd(page.kernel_mask, STRINGS_MASK_NON_DELTA_NON_DICT) == 0) { return 0; } + + // Fixed length byte array: Offsets are fixed, no need to allocate offset buffer + if (chunk.physical_type == Type::FIXED_LEN_BYTE_ARRAY) { return 0; } + + // Estimate number of offsets based on page.num_input_values + // This is an upper bound estimate since we compute this before knowing: + // - exact batch sizes for list columns after preprocessing + // - which rows will be skipped due to subpass boundaries + // We add 1 for the final offset + auto const num_offsets = page.num_input_values + 1; + + // Return size in bytes (uint32_t per offset) + return num_offsets * sizeof(uint32_t); + } +}; + +} // anonymous namespace + +rmm::device_uvector compute_string_offset_sizes(device_span chunks, + device_span pages, + rmm::cuda_stream_view stream) +{ + rmm::device_uvector string_offset_sizes(pages.size(), stream); + + auto iter = thrust::make_counting_iterator(size_t{0}); + thrust::transform(rmm::exec_policy_nosync(stream), + iter, + iter + pages.size(), + string_offset_sizes.begin(), + compute_page_string_offset_size{pages, chunks}); + + return string_offset_sizes; +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/reader_impl_chunking_utils.cuh b/cpp/src/io/parquet/reader_impl_chunking_utils.cuh index 6a279c262da..7918da9270a 100644 --- a/cpp/src/io/parquet/reader_impl_chunking_utils.cuh +++ b/cpp/src/io/parquet/reader_impl_chunking_utils.cuh @@ -224,13 +224,23 @@ rmm::device_uvector compute_decompression_scratch_sizes( device_span pages, rmm::cuda_stream_view stream); +/** + * @brief Computes the per-page buffer sizes required for string offsets. + * + * For non-dictionary, non-FLBA string columns, this computes the size needed + * to store string offsets (uint32_t per value) for each page. + */ +rmm::device_uvector compute_string_offset_sizes(device_span chunks, + device_span pages, + rmm::cuda_stream_view stream); + /** * @brief Add the cost of decompression codec scratch space to the per-page cumulative * size information */ -void include_decompression_scratch_size(device_span pages, - device_span c_info, - rmm::cuda_stream_view stream); +void include_scratch_size(device_span pages, + device_span c_info, + rmm::cuda_stream_view stream); /** * @brief Struct to store split information diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 52644ef1f81..4cbc9e97151 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -278,6 +278,146 @@ void reader_impl::allocate_level_decode_space() } } +namespace { + +/** + * @brief Functor to compute the number of string offsets needed for each page. + * + * For list columns, this uses the batch_size computed during preprocessing from the nesting + * information, which is only accessible on the device. For non-list columns, it estimates based + * on row counts. + */ +struct compute_page_offset_count { + device_span pages; + device_span chunks; + size_t skip_rows; + size_t num_rows; + + __device__ size_t operator()(size_t page_idx) const + { + // Mask for non-dictionary, non-delta string columns (same as used in decode kernel) + constexpr uint32_t STRINGS_MASK_NON_DELTA_NON_DICT = + BitOr(decode_kernel_mask::STRING, + decode_kernel_mask::STRING_NESTED, + decode_kernel_mask::STRING_LIST, + decode_kernel_mask::STRING_STREAM_SPLIT, + decode_kernel_mask::STRING_STREAM_SPLIT_NESTED, + decode_kernel_mask::STRING_STREAM_SPLIT_LIST); + + // Mask for pages with lists (repetition levels) + constexpr uint32_t STRINGS_WITH_LISTS_MASK = + BitOr(decode_kernel_mask::STRING_LIST, decode_kernel_mask::STRING_STREAM_SPLIT_LIST); + + auto const& page = pages[page_idx]; + auto const& chunk = chunks[page.chunk_idx]; + + // Check if this page is a non-dictionary string page using kernel mask + if (BitAnd(page.kernel_mask, STRINGS_MASK_NON_DELTA_NON_DICT) == 0) { return 0; } + + // Fixed length byte array: Offsets are fixed, no need to preprocess + if (chunk.physical_type == Type::FIXED_LEN_BYTE_ARRAY) { return 0; } + + auto const page_start_row = chunk.start_row + page.chunk_row; + auto const page_end_row = page_start_row + page.num_rows; + auto const subpass_start_row = skip_rows; + auto const subpass_end_row = subpass_start_row + num_rows; + + if ((page_end_row <= subpass_start_row) || (page_start_row >= subpass_end_row)) { + return 0; // will skip the page + } + + // Check if this column is a list type + bool const is_list_col = BitAnd(page.kernel_mask, STRINGS_WITH_LISTS_MASK) != 0; + + size_t page_num_values; + if (is_list_col) { + // For list columns, use batch_size computed during preprocessing + // batch_size is at the leaf level (highest nesting depth) + auto const leaf_depth = page.num_output_nesting_levels - 1; + page_num_values = page.nesting[leaf_depth].batch_size; + } else { + // For non-list columns, we don't know how many values we'll read, because we don't know + // how many nulls we'll skip. So we have to read through the skipped rows on the page. + auto const read_end_row = min(page_end_row, subpass_end_row); + page_num_values = read_end_row - page_start_row; + } + + // add 1 for the final offset if we have any values + return page_num_values + (page_num_values > 0 ? 1 : 0); + } +}; + +} // namespace + +void reader_impl::compute_page_string_offset_indices(size_t skip_rows, size_t num_rows) +{ + auto& pass = *_pass_itm_data; + auto& subpass = *pass.subpass; + + auto preprocess_offsets = [&](auto& chunk) { + // Only BYTE_ARRAY strings (not DECIMAL) + if (chunk.physical_type != Type::BYTE_ARRAY) { return false; } + if (chunk.logical_type.has_value() && (chunk.logical_type->type == LogicalType::DECIMAL)) { + return false; + } + + // The encoding can be different for different pages in a column. + // If there are any data pages (not just dictionary pages), we need to preprocess. + return chunk.num_data_pages > 0; + }; + + // Compute the number of offsets per page on the GPU using batch_size from nesting info + auto const num_pages = subpass.pages.size(); + rmm::device_uvector d_page_offset_counts(num_pages, _stream); + + thrust::transform(rmm::exec_policy_nosync(_stream), + thrust::counting_iterator(0), + thrust::counting_iterator(num_pages), + d_page_offset_counts.begin(), + compute_page_offset_count{subpass.pages, pass.chunks, skip_rows, num_rows}); + + // Compute prefix sum (exclusive scan) to get indices for each page + _page_string_offset_indices = rmm::device_uvector(num_pages, _stream); + thrust::exclusive_scan(rmm::exec_policy_nosync(_stream), + d_page_offset_counts.begin(), + d_page_offset_counts.end(), + _page_string_offset_indices.begin()); + + // Compute the total number of offsets needed + size_t total_num_offsets = thrust::reduce( + rmm::exec_policy_nosync(_stream), d_page_offset_counts.begin(), d_page_offset_counts.end()); + + _stream.synchronize(); + + // Allocate the string offset buffer + _string_offset_buffer = rmm::device_uvector(total_num_offsets, _stream, _mr); + + // Set the string offset buffer for non-dictionary, non-FLBA string columns + for (size_t col_idx = 0; col_idx < pass.chunks.size(); ++col_idx) { + auto& chunk = pass.chunks[col_idx]; + // Check if this is a string column without dictionary & not a fixed length byte array + if (preprocess_offsets(chunk)) { + chunk.column_string_offset_base = _string_offset_buffer.data(); + } + } + + // Transfer the updated chunks to device + pass.chunks.host_to_device_async(_stream); + _stream.synchronize(); + + // Pre-process string offsets for non-dictionary string columns + detail::preprocess_string_offsets(subpass.pages, + pass.chunks, + _page_string_offset_indices, + _subpass_page_mask, + skip_rows, + num_rows, + _stream); + + // Wait for string offset preprocessing to complete before launching decode kernels + _stream.synchronize(); +} + std::pair> reader_impl::read_column_chunks() { auto const& row_groups_info = _pass_itm_data->row_groups;