Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2019-2024, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#include "avro_gpu.hpp"
Expand Down Expand Up @@ -182,7 +182,7 @@ avro_decode_row(schemadesc_s const* schema,
if (dataptr != nullptr && dst_row >= 0) {
uint32_t v;
if (cur + 3 < end) {
v = unaligned_load32(cur);
v = unaligned_load<uint32_t>(cur);
cur += 4;
} else {
v = 0;
Expand All @@ -198,7 +198,7 @@ avro_decode_row(schemadesc_s const* schema,
if (dataptr != nullptr && dst_row >= 0) {
uint64_t v;
if (cur + 7 < end) {
v = unaligned_load64(cur);
v = unaligned_load<uint64_t>(cur);
cur += 8;
} else {
v = 0;
Expand Down
17 changes: 3 additions & 14 deletions cpp/src/io/comp/snap.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ static inline __device__ uint32_t snap_hash(uint32_t v)
return (v * ((1 << 20) + (0x2a00) + (0x6a) + 1)) >> (32 - hash_bits);
}

/**
* @brief Fetches four consecutive bytes
*/
static inline __device__ uint32_t fetch4(uint8_t const* src)
{
uint32_t src_align = 3 & reinterpret_cast<uintptr_t>(src);
auto const* src32 = reinterpret_cast<uint32_t const*>(src - src_align);
uint32_t v = src32[0];
return (src_align) ? __funnelshift_r(v, src32[1], src_align * 8) : v;
}

/**
* @brief Outputs a snappy literal symbol
*
Expand Down Expand Up @@ -170,7 +159,7 @@ static __device__ uint32_t FindFourByteMatch(snap_state_s* s,
if (t == 0) { s->copy_length = 0; }
do {
bool valid4 = (pos + t + 4 <= len);
uint32_t data32 = (valid4) ? fetch4(src + pos + t) : 0;
uint32_t data32 = (valid4) ? cudf::io::unaligned_load<uint32_t>(src + pos + t) : 0;
uint32_t hash = (valid4) ? snap_hash(data32) : 0;
uint32_t local_match = HashMatchAny(hash, t);
uint32_t local_match_lane = 31 - __clz(local_match & ((1 << t) - 1));
Expand All @@ -183,8 +172,8 @@ static __device__ uint32_t FindFourByteMatch(snap_state_s* s,
} else {
offset = (pos & ~0xffff) | s->hash_map[hash];
if (offset >= pos) { offset = (offset >= 0x1'0000) ? offset - 0x1'0000 : pos; }
match =
(offset < pos && offset + max_copy_distance >= pos + t && fetch4(src + offset) == data32);
match = (offset < pos && offset + max_copy_distance >= pos + t &&
cudf::io::unaligned_load<uint32_t>(src + offset) == data32);
}
} else {
match = 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ __device__ int update_validity_and_row_indices_lists(int32_t target_value_count,
// Not all values visited by this block will represent a value at this nesting level.
// the validity bit for thread t might actually represent output value t-6.
// the correct position for thread t's bit is thread_value_count.
uint32_t const warp_valid_mask =
WarpReduceOr32((uint32_t)is_valid << thread_value_count_within_warp);
uint32_t const warp_valid_mask = warp_reduce_or<cudf::detail::warp_size>(
static_cast<uint32_t>(is_valid) << thread_value_count_within_warp);
int thread_valid_count, block_valid_count;
{
auto thread_mask = (uint32_t(1) << thread_value_count_within_warp) - 1;
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/parquet/experimental/dictionary_page_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ __device__ __forceinline__ void decode_int96timestamp(uint8_t const* int96_ptr,
// Note: This function has been modified from the original at
// https://github.com/rapidsai/cudf/blob/c89c83c00c729a86c56570693b627f31408bc2c9/cpp/src/io/parquet/page_data.cuh#L133-L198

int64_t nanos = cudf::io::unaligned_load64(int96_ptr);
int64_t days = cudf::io::unaligned_load32(int96_ptr + sizeof(int64_t));
int64_t nanos = cudf::io::unaligned_load<uint64_t>(int96_ptr);
int64_t days = cudf::io::unaligned_load<uint32_t>(int96_ptr + sizeof(int64_t));

// Convert from Julian day at noon to UTC seconds
cudf::duration_D duration_days{
Expand Down Expand Up @@ -489,7 +489,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle timestamps
if constexpr (cudf::is_timestamp<T>()) {
auto const timestamp =
cudf::io::unaligned_load32(page_data + (value_idx * sizeof(uint32_t)));
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t)));
if (timestamp_scale != 0) {
decoded_value = T{typename T::duration(static_cast<typename T::rep>(timestamp))};
} else {
Expand All @@ -513,7 +513,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
}

auto const duration =
cudf::io::unaligned_load32(page_data + (value_idx * sizeof(uint32_t)));
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t)));
decoded_value = T{static_cast<typename T::rep>(duration)};
}
// Handle other int32 encoded values including smaller bitwidths and decimal32
Expand All @@ -523,8 +523,8 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
set_error(error, decode_error::INVALID_DATA_TYPE);
return {};
}
decoded_value =
static_cast<T>(cudf::io::unaligned_load32(page_data + (value_idx * sizeof(uint32_t))));
decoded_value = static_cast<T>(
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t))));
}
break;
}
Expand All @@ -538,7 +538,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle timestamps
if constexpr (cudf::is_timestamp<T>()) {
int64_t const timestamp =
cudf::io::unaligned_load64(page_data + (value_idx * sizeof(int64_t)));
cudf::io::unaligned_load<uint64_t>(page_data + (value_idx * sizeof(int64_t)));
if (timestamp_scale != 0) {
decoded_value = T{typename T::duration(
static_cast<typename T::rep>(convert_to_timestamp64(timestamp, timestamp_scale)))};
Expand All @@ -548,8 +548,8 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
}
// Handle durations and other int64 encoded values including decimal64
else {
decoded_value =
static_cast<T>(cudf::io::unaligned_load64(page_data + (value_idx * sizeof(uint64_t))));
decoded_value = static_cast<T>(
cudf::io::unaligned_load<uint64_t>(page_data + (value_idx * sizeof(uint64_t))));
}
break;
}
Expand Down
13 changes: 2 additions & 11 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,11 @@ inline __device__ void gpuStoreOutput(uint32_t* dst,
uint32_t dict_pos,
uint32_t dict_size)
{
uint32_t bytebuf;
unsigned int ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits
if (dict_pos < dict_size) {
bytebuf = *reinterpret_cast<uint32_t const*>(src8 + dict_pos);
if (ofs) {
uint32_t bytebufnext = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 4);
bytebuf = __funnelshift_r(bytebuf, bytebufnext, ofs);
}
*dst = cudf::io::unaligned_load<uint32_t>(src8 + dict_pos);
} else {
bytebuf = 0;
*dst = 0;
}
*dst = bytebuf;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// the validity bit for thread t might actually represent output value t-6. the correct
// position for thread t's bit is thread_value_count. for cuda 11 we could use
// __reduce_or_sync(), but until then we have to do a warp reduce.
WarpReduceOr32(is_valid << thread_value_count);
warp_reduce_or<cudf::detail::warp_size>(is_valid << thread_value_count);

thread_valid_count = __popc(warp_valid_mask & ((1 << thread_value_count) - 1));
warp_valid_count = __popc(warp_valid_mask);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
auto const src = page_string_data + string_offset;
memcpy_block<decode_block_size, true>(dst, src, s->page.str_bytes, block.thread_rank());
memcpy_block<decode_block_size, true>(dst, src, s->page.str_bytes, block);

if (block.thread_rank() == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <cooperative_groups.h>
#include <cub/cub.cuh>
#include <cuda/std/chrono>
#include <cuda/std/functional>
Expand Down Expand Up @@ -2216,7 +2217,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
auto const output_ptr = packer.flush();

// now copy the char data
memcpy_block<block_size, true>(output_ptr, first_string, string_data_len, t);
memcpy_block<block_size, true>(
output_ptr, first_string, string_data_len, cooperative_groups::this_thread_block());

finish_page_encode<block_size>(
s, output_ptr + string_data_len, pages, comp_in, comp_out, comp_results, true);
Expand Down Expand Up @@ -3051,6 +3053,8 @@ CUDF_KERNEL void __launch_bounds__(encode_block_size)
// blockDim(1024, 1, 1)
CUDF_KERNEL void __launch_bounds__(1024) gpuGatherPages(device_span<EncColumnChunk> chunks)
{
namespace cg = cooperative_groups;

__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;

Expand Down Expand Up @@ -3078,20 +3082,20 @@ CUDF_KERNEL void __launch_bounds__(1024) gpuGatherPages(device_span<EncColumnChu
src = ck_g.is_compressed ? page_g.compressed_data : page_g.page_data;
// Copy page header
hdr_len = page_g.hdr_size;
memcpy_block<1024, true>(dst, src, hdr_len, t);
memcpy_block<1024, true>(dst, src, hdr_len, cg::this_thread_block());
src += page_g.max_hdr_size;
dst += hdr_len;
uncompressed_size += hdr_len;
data_len = ck_g.is_compressed ? page_g.comp_data_size : page_g.data_size;
// Copy page data. For V2, the level data and page data are disjoint.
if (page_g.is_v2()) {
auto const lvl_len = page_g.level_bytes();
memcpy_block<1024, true>(dst, src, lvl_len, t);
memcpy_block<1024, true>(dst, src, lvl_len, cg::this_thread_block());
src += page_g.max_lvl_size;
dst += lvl_len;
data_len -= lvl_len;
}
memcpy_block<1024, true>(dst, src, data_len, t);
memcpy_block<1024, true>(dst, src, data_len, cg::this_thread_block());
dst += data_len;
__syncthreads();
if (t == 0 && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; }
Expand Down
132 changes: 54 additions & 78 deletions cpp/src/io/utilities/block_utils.cuh
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2019-2023, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once
#include <cstdint>
#include <cudf/detail/utilities/cuda.cuh>

#include <cooperative_groups.h>
#include <cub/cub.cuh>
#include <cuda/std/cstdint>
#include <cuda/std/cstring>

namespace cudf {
namespace io {
Expand All @@ -21,39 +26,20 @@ inline __device__ T shuffle_xor(T var, uint32_t delta)
return __shfl_xor_sync(~0, var, delta);
}

inline __device__ void syncwarp() { __syncwarp(); }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as not being used anywhere


inline __device__ uint32_t ballot(int pred) { return __ballot_sync(~0, pred); }

// Warp reduction helpers
template <typename T>
inline __device__ T WarpReduceOr2(T acc)
{
return acc | shuffle_xor(acc, 1);
}
template <typename T>
inline __device__ T WarpReduceOr4(T acc)
{
acc = WarpReduceOr2(acc);
return acc | shuffle_xor(acc, 2);
}
template <typename T>
inline __device__ T WarpReduceOr8(T acc)
{
acc = WarpReduceOr4(acc);
return acc | shuffle_xor(acc, 4);
}
template <typename T>
inline __device__ T WarpReduceOr16(T acc)
template <cudf::size_type size, typename T>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modernized these into one

inline __device__ T warp_reduce_or(T acc)
{
acc = WarpReduceOr8(acc);
return acc | shuffle_xor(acc, 8);
}
template <typename T>
inline __device__ T WarpReduceOr32(T acc)
{
acc = WarpReduceOr16(acc);
return acc | shuffle_xor(acc, 16);
static_assert(size >= 2 and size <= cudf::detail::warp_size and (size & (size - 1)) == 0,
"Size must be a power of 2 and less than or equal to the warp size");
if constexpr (size == 2) {
return acc | shuffle_xor(acc, 1);
} else {
acc = warp_reduce_or<size / 2>(acc);
return acc | shuffle_xor(acc, size / 2);
}
}

template <typename T>
Expand Down Expand Up @@ -113,66 +99,56 @@ inline __device__ double Int128ToDouble_rn(uint64_t lo, int64_t hi)
return sign * __fma_rn(__ll2double_rn(hi), 4294967296.0 * 4294967296.0, __ull2double_rn(lo));
}

inline __device__ uint32_t unaligned_load32(uint8_t const* p)
{
uint32_t ofs = 3 & reinterpret_cast<uintptr_t>(p);
auto const* p32 = reinterpret_cast<uint32_t const*>(p - ofs);
uint32_t v = p32[0];
return (ofs) ? __funnelshift_r(v, p32[1], ofs * 8) : v;
}

inline __device__ uint64_t unaligned_load64(uint8_t const* p)
template <typename T>
requires(cuda::std::is_same_v<T, uint32_t> or cuda::std::is_same_v<T, uint64_t>)
inline __device__ T unaligned_load(uint8_t const* p)
{
uint32_t ofs = 3 & reinterpret_cast<uintptr_t>(p);
auto const* p32 = reinterpret_cast<uint32_t const*>(p - ofs);
uint32_t v0 = p32[0];
uint32_t v1 = p32[1];
if (ofs) {
v0 = __funnelshift_r(v0, v1, ofs * 8);
v1 = __funnelshift_r(v1, p32[2], ofs * 8);
}
return (((uint64_t)v1) << 32) | v0;
T value;
cuda::std::memcpy(&value, p, sizeof(T));
return value;
}

template <unsigned int nthreads, bool sync_before_store>
inline __device__ void memcpy_block(void* dstv, void const* srcv, uint32_t len, uint32_t t)
{
template <uint32_t nthreads, bool sync_before_store>
inline __device__ void memcpy_block(void* dstv,
void const* srcv,
uint32_t len,
cooperative_groups::thread_block const& block)
{
static_assert(
nthreads >= sizeof(uint32_t),
"The kernel block size (nthreads) must be greater than or equal to the size of uint32_t");
auto const t = block.thread_rank();
auto* dst = static_cast<uint8_t*>(dstv);
auto const* src = static_cast<uint8_t const*>(srcv);
uint32_t dst_align_bytes, src_align_bytes, src_align_bits;
// Align output to 32-bit
dst_align_bytes = 3 & -reinterpret_cast<intptr_t>(dst);
auto const dst_align_bytes = static_cast<uint32_t>(0x3 & -reinterpret_cast<intptr_t>(dst));
if (dst_align_bytes != 0) {
uint32_t align_len = min(dst_align_bytes, len);
uint8_t b;
if (t < align_len) { b = src[t]; }
if constexpr (sync_before_store) { __syncthreads(); }
if (t < align_len) { dst[t] = b; }
auto const align_len = cuda::std::min<uint32_t>(dst_align_bytes, len);
uint8_t byte;
if (t < align_len) { byte = src[t]; }
if constexpr (sync_before_store) { block.sync(); }
if (t < align_len) { dst[t] = byte; }
src += align_len;
dst += align_len;
len -= align_len;
}
src_align_bytes = (uint32_t)(3 & reinterpret_cast<uintptr_t>(src));
src_align_bits = src_align_bytes * 8;
while (len >= 4) {
auto const* src32 = reinterpret_cast<uint32_t const*>(src - src_align_bytes);
uint32_t copy_cnt = min(len >> 2, nthreads);
uint32_t v;
if (t < copy_cnt) {
v = src32[t];
if (src_align_bits != 0) { v = __funnelshift_r(v, src32[t + 1], src_align_bits); }
}
if constexpr (sync_before_store) { __syncthreads(); }
if (t < copy_cnt) { reinterpret_cast<uint32_t*>(dst)[t] = v; }
src += copy_cnt * 4;
dst += copy_cnt * 4;
len -= copy_cnt * 4;
// Copy 32-bit chunks
while (len >= sizeof(uint32_t)) {
auto const copy_cnt = cuda::std::min<uint32_t>(len / sizeof(uint32_t), nthreads);
uint32_t value;
if (t < copy_cnt) { value = unaligned_load<uint32_t>(src + (t * sizeof(uint32_t))); }
if constexpr (sync_before_store) { block.sync(); }
if (t < copy_cnt) { reinterpret_cast<uint32_t*>(dst)[t] = value; }
src += copy_cnt * sizeof(uint32_t);
dst += copy_cnt * sizeof(uint32_t);
len -= copy_cnt * sizeof(uint32_t);
}
// Copy the remaining bytes
if (len != 0) {
uint8_t b;
if (t < len) { b = src[t]; }
if constexpr (sync_before_store) { __syncthreads(); }
if (t < len) { dst[t] = b; }
uint8_t byte;
if (t < len) { byte = src[t]; }
if constexpr (sync_before_store) { block.sync(); }
if (t < len) { dst[t] = byte; }
}
}

Expand Down
Loading