Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2019-2024, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#include "avro_gpu.hpp"
Expand Down Expand Up @@ -182,7 +182,7 @@ avro_decode_row(schemadesc_s const* schema,
if (dataptr != nullptr && dst_row >= 0) {
uint32_t v;
if (cur + 3 < end) {
v = unaligned_load32(cur);
v = unaligned_load<uint32_t>(cur);
cur += 4;
} else {
v = 0;
Expand All @@ -198,7 +198,7 @@ avro_decode_row(schemadesc_s const* schema,
if (dataptr != nullptr && dst_row >= 0) {
uint64_t v;
if (cur + 7 < end) {
v = unaligned_load64(cur);
v = unaligned_load<uint64_t>(cur);
cur += 8;
} else {
v = 0;
Expand Down
17 changes: 3 additions & 14 deletions cpp/src/io/comp/snap.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ static inline __device__ uint32_t snap_hash(uint32_t v)
return (v * ((1 << 20) + (0x2a00) + (0x6a) + 1)) >> (32 - hash_bits);
}

/**
* @brief Fetches four consecutive bytes
*/
static inline __device__ uint32_t fetch4(uint8_t const* src)
{
uint32_t src_align = 3 & reinterpret_cast<uintptr_t>(src);
auto const* src32 = reinterpret_cast<uint32_t const*>(src - src_align);
uint32_t v = src32[0];
return (src_align) ? __funnelshift_r(v, src32[1], src_align * 8) : v;
}

/**
* @brief Outputs a snappy literal symbol
*
Expand Down Expand Up @@ -170,7 +159,7 @@ static __device__ uint32_t FindFourByteMatch(snap_state_s* s,
if (t == 0) { s->copy_length = 0; }
do {
bool valid4 = (pos + t + 4 <= len);
uint32_t data32 = (valid4) ? fetch4(src + pos + t) : 0;
uint32_t data32 = (valid4) ? cudf::io::unaligned_load<uint32_t>(src + pos + t) : 0;
uint32_t hash = (valid4) ? snap_hash(data32) : 0;
uint32_t local_match = HashMatchAny(hash, t);
uint32_t local_match_lane = 31 - __clz(local_match & ((1 << t) - 1));
Expand All @@ -183,8 +172,8 @@ static __device__ uint32_t FindFourByteMatch(snap_state_s* s,
} else {
offset = (pos & ~0xffff) | s->hash_map[hash];
if (offset >= pos) { offset = (offset >= 0x1'0000) ? offset - 0x1'0000 : pos; }
match =
(offset < pos && offset + max_copy_distance >= pos + t && fetch4(src + offset) == data32);
match = (offset < pos && offset + max_copy_distance >= pos + t &&
cudf::io::unaligned_load<uint32_t>(src + offset) == data32);
}
} else {
match = 0;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/comp/unsnap.cu
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ __device__ void snappy_decode_symbols(unsnap_state_s* s, uint32_t t)
b[t].offset = ofs;
ofs += blen; // for correct out-of-range detection below
}
blen = WarpReducePos32(blen, t);
blen = warp_reduce_pos<cudf::detail::warp_size>(blen, t);
bytes_left = shuffle(bytes_left);
dst_pos = shuffle(dst_pos);
short_sym_mask = __ffs(ballot(blen > bytes_left || ofs > (int32_t)(dst_pos + blen)));
Expand Down Expand Up @@ -374,7 +374,7 @@ __device__ void snappy_decode_symbols(unsnap_state_s* s, uint32_t t)
b[batch_len + t].offset = ofs;
ofs += blen; // for correct out-of-range detection below
}
blen = WarpReducePos32(blen, t);
blen = warp_reduce_pos<cudf::detail::warp_size>(blen, t);
bytes_left = shuffle(bytes_left);
dst_pos = shuffle(dst_pos);
short_sym_mask = __ffs(ballot(blen > bytes_left || ofs > (int32_t)(dst_pos + blen)));
Expand Down Expand Up @@ -521,7 +521,7 @@ __device__ void snappy_process_symbols(unsnap_state_s* s, int t, Storage& temp_s
if (shuffle(min((uint32_t)dist_t, (uint32_t)shuffle_xor(dist_t, 1))) > 8) {
uint32_t n;
do {
uint32_t bofs = WarpReducePos32(blen_t, t);
uint32_t bofs = warp_reduce_pos<32>(blen_t, t);
uint32_t stop_mask = ballot((uint32_t)dist_t < bofs);
uint32_t start_mask =
cub::WarpReduce<uint32_t>(temp_storage).Sum((bofs < 32 && t < batch_len) ? 1 << bofs : 0);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ __device__ int update_validity_and_row_indices_lists(int32_t target_value_count,
// Not all values visited by this block will represent a value at this nesting level.
// the validity bit for thread t might actually represent output value t-6.
// the correct position for thread t's bit is thread_value_count.
uint32_t const warp_valid_mask =
WarpReduceOr32((uint32_t)is_valid << thread_value_count_within_warp);
uint32_t const warp_valid_mask = warp_reduce_or<cudf::detail::warp_size>(
static_cast<uint32_t>(is_valid) << thread_value_count_within_warp);
int thread_valid_count, block_valid_count;
{
auto thread_mask = (uint32_t(1) << thread_value_count_within_warp) - 1;
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/parquet/experimental/dictionary_page_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ __device__ __forceinline__ void decode_int96timestamp(uint8_t const* int96_ptr,
// Note: This function has been modified from the original at
// https://github.com/rapidsai/cudf/blob/c89c83c00c729a86c56570693b627f31408bc2c9/cpp/src/io/parquet/page_data.cuh#L133-L198

int64_t nanos = cudf::io::unaligned_load64(int96_ptr);
int64_t days = cudf::io::unaligned_load32(int96_ptr + sizeof(int64_t));
int64_t nanos = cudf::io::unaligned_load<uint64_t>(int96_ptr);
int64_t days = cudf::io::unaligned_load<uint32_t>(int96_ptr + sizeof(int64_t));

// Convert from Julian day at noon to UTC seconds
cudf::duration_D duration_days{
Expand Down Expand Up @@ -489,7 +489,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle timestamps
if constexpr (cudf::is_timestamp<T>()) {
auto const timestamp =
cudf::io::unaligned_load32(page_data + (value_idx * sizeof(uint32_t)));
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t)));
if (timestamp_scale != 0) {
decoded_value = T{typename T::duration(static_cast<typename T::rep>(timestamp))};
} else {
Expand All @@ -513,7 +513,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
}

auto const duration =
cudf::io::unaligned_load32(page_data + (value_idx * sizeof(uint32_t)));
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t)));
decoded_value = T{static_cast<typename T::rep>(duration)};
}
// Handle other int32 encoded values including smaller bitwidths and decimal32
Expand All @@ -523,8 +523,8 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
set_error(error, decode_error::INVALID_DATA_TYPE);
return {};
}
decoded_value =
static_cast<T>(cudf::io::unaligned_load32(page_data + (value_idx * sizeof(uint32_t))));
decoded_value = static_cast<T>(
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t))));
}
break;
}
Expand All @@ -538,7 +538,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle timestamps
if constexpr (cudf::is_timestamp<T>()) {
int64_t const timestamp =
cudf::io::unaligned_load64(page_data + (value_idx * sizeof(int64_t)));
cudf::io::unaligned_load<uint64_t>(page_data + (value_idx * sizeof(int64_t)));
if (timestamp_scale != 0) {
decoded_value = T{typename T::duration(
static_cast<typename T::rep>(convert_to_timestamp64(timestamp, timestamp_scale)))};
Expand All @@ -548,8 +548,8 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
}
// Handle durations and other int64 encoded values including decimal64
else {
decoded_value =
static_cast<T>(cudf::io::unaligned_load64(page_data + (value_idx * sizeof(uint64_t))));
decoded_value = static_cast<T>(
cudf::io::unaligned_load<uint64_t>(page_data + (value_idx * sizeof(uint64_t))));
}
break;
}
Expand Down
13 changes: 2 additions & 11 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,11 @@ inline __device__ void gpuStoreOutput(uint32_t* dst,
uint32_t dict_pos,
uint32_t dict_size)
{
uint32_t bytebuf;
unsigned int ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits
if (dict_pos < dict_size) {
bytebuf = *reinterpret_cast<uint32_t const*>(src8 + dict_pos);
if (ofs) {
uint32_t bytebufnext = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 4);
bytebuf = __funnelshift_r(bytebuf, bytebufnext, ofs);
}
*dst = cudf::io::unaligned_load<uint32_t>(src8 + dict_pos);
} else {
bytebuf = 0;
*dst = 0;
}
*dst = bytebuf;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// the validity bit for thread t might actually represent output value t-6. the correct
// position for thread t's bit is thread_value_count. for cuda 11 we could use
// __reduce_or_sync(), but until then we have to do a warp reduce.
WarpReduceOr32(is_valid << thread_value_count);
warp_reduce_or<cudf::detail::warp_size>(is_valid << thread_value_count);

thread_valid_count = __popc(warp_valid_mask & ((1 << thread_value_count) - 1));
warp_valid_count = __popc(warp_valid_mask);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
auto const src = page_string_data + string_offset;
memcpy_block<decode_block_size, true>(dst, src, s->page.str_bytes, block.thread_rank());
memcpy_block<decode_block_size, true>(dst, src, s->page.str_bytes, block);

if (block.thread_rank() == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <cooperative_groups.h>
#include <cub/cub.cuh>
#include <cuda/std/chrono>
#include <cuda/std/functional>
Expand Down Expand Up @@ -2216,7 +2217,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
auto const output_ptr = packer.flush();

// now copy the char data
memcpy_block<block_size, true>(output_ptr, first_string, string_data_len, t);
memcpy_block<block_size, true>(
output_ptr, first_string, string_data_len, cooperative_groups::this_thread_block());

finish_page_encode<block_size>(
s, output_ptr + string_data_len, pages, comp_in, comp_out, comp_results, true);
Expand Down Expand Up @@ -3051,6 +3053,8 @@ CUDF_KERNEL void __launch_bounds__(encode_block_size)
// blockDim(1024, 1, 1)
CUDF_KERNEL void __launch_bounds__(1024) gpuGatherPages(device_span<EncColumnChunk> chunks)
{
namespace cg = cooperative_groups;

__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;

Expand Down Expand Up @@ -3078,20 +3082,20 @@ CUDF_KERNEL void __launch_bounds__(1024) gpuGatherPages(device_span<EncColumnChu
src = ck_g.is_compressed ? page_g.compressed_data : page_g.page_data;
// Copy page header
hdr_len = page_g.hdr_size;
memcpy_block<1024, true>(dst, src, hdr_len, t);
memcpy_block<1024, true>(dst, src, hdr_len, cg::this_thread_block());
src += page_g.max_hdr_size;
dst += hdr_len;
uncompressed_size += hdr_len;
data_len = ck_g.is_compressed ? page_g.comp_data_size : page_g.data_size;
// Copy page data. For V2, the level data and page data are disjoint.
if (page_g.is_v2()) {
auto const lvl_len = page_g.level_bytes();
memcpy_block<1024, true>(dst, src, lvl_len, t);
memcpy_block<1024, true>(dst, src, lvl_len, cg::this_thread_block());
src += page_g.max_lvl_size;
dst += lvl_len;
data_len -= lvl_len;
}
memcpy_block<1024, true>(dst, src, data_len, t);
memcpy_block<1024, true>(dst, src, data_len, cg::this_thread_block());
dst += data_len;
__syncthreads();
if (t == 0 && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; }
Expand Down
Loading
Loading