Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1455bf5
Add overflow support for signed integers and decimals
PointKernel Aug 5, 2025
bf00cce
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 21, 2025
3badfc7
Resolve conflicts
PointKernel Oct 21, 2025
8a44639
Cleanups
PointKernel Oct 21, 2025
e0616f4
Fix
PointKernel Oct 21, 2025
e5fd7ee
Cleanups
PointKernel Oct 21, 2025
11f7162
Disable decimal128 support for now
PointKernel Oct 21, 2025
2cb91e1
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 21, 2025
a914990
Minor cleanups
PointKernel Oct 21, 2025
512701b
Update comments
PointKernel Oct 21, 2025
2de4417
Add decimal128 support
PointKernel Oct 21, 2025
7ae723a
Update tests
PointKernel Oct 21, 2025
0429b9e
Cleanups
PointKernel Oct 21, 2025
e08ea0f
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 21, 2025
7a9dc29
Minor fix
PointKernel Oct 21, 2025
2710856
Fix Hopper atomicAdd
PointKernel Oct 21, 2025
7c0f1da
Cleanups
PointKernel Oct 22, 2025
60bde43
Fix the stream issue in tests
PointKernel Oct 22, 2025
a8013fc
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 22, 2025
64fe259
Minor cleanups
PointKernel Oct 22, 2025
9efc090
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 22, 2025
a11afbe
Apply suggestion from @davidwendt
PointKernel Oct 22, 2025
d520a28
Apply suggestion from @davidwendt
PointKernel Oct 22, 2025
d704fc7
Merge branch 'main' into sum-overflow-decimal
PointKernel Oct 22, 2025
b775e55
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 23, 2025
0748f55
Clean up tests
PointKernel Oct 23, 2025
a045b5d
Merge remote-tracking branch 'origin/sum-overflow-decimal' into sum-o…
PointKernel Oct 23, 2025
2f154ab
Header cleanups
PointKernel Oct 23, 2025
286dae2
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 23, 2025
4f6068c
Merge branch 'main' into sum-overflow-decimal
PointKernel Oct 27, 2025
053624c
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Oct 27, 2025
da628af
Update copyrights
PointKernel Oct 27, 2025
532580e
Merge remote-tracking branch 'origin/sum-overflow-decimal' into sum-o…
PointKernel Oct 27, 2025
d6e3d9a
Merge branch 'main' into sum-overflow-decimal
PointKernel Oct 27, 2025
c934333
Merge branch 'main' into sum-overflow-decimal
PointKernel Oct 29, 2025
a7b5422
Merge branch 'main' into sum-overflow-decimal
PointKernel Oct 29, 2025
3856f1d
Update comment
PointKernel Nov 3, 2025
a728a39
Merge remote-tracking branch 'origin/sum-overflow-decimal' into sum-o…
PointKernel Nov 3, 2025
5226059
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Nov 3, 2025
4372f92
Update cpp/include/cudf/detail/aggregation/aggregation.hpp
PointKernel Nov 3, 2025
76690de
Merge branch 'main' into sum-overflow-decimal
PointKernel Nov 3, 2025
55fc6db
Merge remote-tracking branch 'upstream/main' into sum-overflow-decimal
PointKernel Nov 4, 2025
e1288e7
Merge remote-tracking branch 'origin/sum-overflow-decimal' into sum-o…
PointKernel Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1416,8 +1416,11 @@ struct target_type_impl<Source,
using type = Source;
};

// SUM_WITH_OVERFLOW always outputs a struct {sum: int64_t, overflow: bool} regardless of input type
// SUM_WITH_OVERFLOW outputs a struct {sum: Source, overflow: bool} where sum type matches input
// type Only supports signed integral types (excluding bool) and decimal types
template <typename Source>
requires((cudf::is_integral_not_bool<Source>() && cudf::is_signed<Source>()) ||
cudf::is_fixed_point<Source>())
struct target_type_impl<Source, aggregation::SUM_WITH_OVERFLOW> {
using type = struct_view; // SUM_WITH_OVERFLOW outputs a struct with sum and overflow fields
};
Expand Down
33 changes: 16 additions & 17 deletions cpp/include/cudf/detail/aggregation/device_aggregators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/traits.cuh>
#include <cudf/utilities/type_dispatcher.hpp>

#include <cuda/std/limits>
#include <cuda/std/type_traits>
Expand Down Expand Up @@ -138,41 +140,38 @@ struct update_target_element<Source, aggregation::SUM> {
};

template <typename Source>
requires(cuda::std::is_same_v<Source, int64_t>)
requires(
(cudf::is_integral_not_bool<Source>() && cudf::is_signed<Source>()) ||
(cudf::is_fixed_point<Source>() && cudf::has_atomic_support<device_storage_type_t<Source>>()) ||
cuda::std::is_same_v<Source, numeric::decimal128>)
struct update_target_element<Source, aggregation::SUM_WITH_OVERFLOW> {
using DeviceType = device_storage_type_t<Source>;
static constexpr auto type_max = cuda::std::numeric_limits<DeviceType>::max();
static constexpr auto type_min = cuda::std::numeric_limits<DeviceType>::min();

__device__ void operator()(mutable_column_device_view target,
size_type target_index,
column_device_view source,
size_type source_index) const noexcept
{
// For SUM_WITH_OVERFLOW, target is a struct with sum value at child(0) and overflow flag at
// child(1)
auto sum_column = target.child(0);
auto overflow_column = target.child(1);

auto const source_value = source.element<Source>(source_index);
auto const source_value = source.element<DeviceType>(source_index);
auto const old_sum =
cudf::detail::atomic_add(&sum_column.element<int64_t>(target_index), source_value);
cudf::detail::atomic_add(&sum_column.element<DeviceType>(target_index), source_value);

// Early exit if overflow is already set to avoid unnecessary overflow checking
// Early exit if overflow is already set
auto bool_ref = cuda::atomic_ref<bool, cuda::thread_scope_device>{
*(overflow_column.data<bool>() + target_index)};
if (bool_ref.load(cuda::memory_order_relaxed)) { return; }

// Check for overflow before performing the addition to avoid UB
// For positive overflow: old_sum > 0, source_value > 0, and old_sum > max - source_value
// For negative overflow: old_sum < 0, source_value < 0, and old_sum < min - source_value
// TODO: to be replaced by CCCL equivalents once https://github.com/NVIDIA/cccl/pull/3755 is
// ready
auto constexpr int64_max = cuda::std::numeric_limits<int64_t>::max();
auto constexpr int64_min = cuda::std::numeric_limits<int64_t>::min();
auto const overflow =
((old_sum > 0 && source_value > 0 && old_sum > int64_max - source_value) ||
(old_sum < 0 && source_value < 0 && old_sum < int64_min - source_value));
if (overflow) {
// Atomically set overflow flag to true (use atomic_max since true > false)
cudf::detail::atomic_max(&overflow_column.element<bool>(target_index), true);
}
source_value > 0 ? old_sum > type_max - source_value : old_sum < type_min - source_value;

if (overflow) { cudf::detail::atomic_max(&overflow_column.element<bool>(target_index), true); }
}
};

Expand Down
73 changes: 73 additions & 0 deletions cpp/include/cudf/detail/utilities/device_atomics.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include <cuda/atomic>
#include <cuda/std/type_traits>
#include <cuda/utility>

namespace cudf {
namespace detail {
Expand Down Expand Up @@ -346,5 +347,77 @@ __forceinline__ __device__ T atomic_cas(T* address, T compare, T val)
return cudf::detail::typesAtomicCASImpl<T>()(address, compare, val);
}

/**
* @brief Helper function to calculate carry for 64-bit addition
*
* @param old_val The original 64-bit value
* @param add_val The value being added
* @param carry_in Carry from previous addition
* @return 1 if carry is generated, 0 otherwise
*/
__device__ __forceinline__ uint64_t calculate_carry_64(uint64_t old_val,
uint64_t add_val,
uint64_t carry_in)
{
// Use __uint128_t to detect overflow beyond 64-bit range
__uint128_t sum = static_cast<__uint128_t>(old_val) + add_val + carry_in;
return sum > cuda::std::numeric_limits<uint64_t>::max();
}

/**
* @brief Atomic addition for __int128_t with architecture-specific optimization
*
* Uses native 128-bit CAS on Hopper+ GPUs (compute capability 9.0+) for optimal
* performance. Falls back to two 64-bit atomic CAS operations with carry propagation
* on older GPU architectures.
*
* @param address Pointer to the __int128_t value
* @param val Value to add
* @return The old value before addition
*/
__forceinline__ __device__ __int128_t atomic_add(__int128_t* address, __int128_t val)
{
#if __CUDA_ARCH__ >= 900
__int128_t expected, desired;

do {
expected = *address;
desired = expected + val;
} while (atomicCAS(address, expected, desired) != expected);

return expected;
#else
uint64_t* const target_ptr = reinterpret_cast<uint64_t*>(address);
__uint128_t const add_val_unsigned = static_cast<__uint128_t>(val);

// Split the 128-bit add value into two 64-bit parts
uint64_t const add_low = static_cast<uint64_t>(add_val_unsigned);
uint64_t const add_high = static_cast<uint64_t>(add_val_unsigned >> 64);

uint64_t carry = 0;
uint64_t old_parts[2];

cuda::static_for<0, 2>([&](auto i) {
uint64_t const current_add = (i == 0) ? add_low : add_high;
uint64_t expected_part, new_part;

cuda::atomic_ref<uint64_t, cuda::thread_scope_device> atomic_part{target_ptr[i]};

do {
expected_part = atomic_part.load();
new_part = expected_part + current_add + carry;
} while (
!atomic_part.compare_exchange_weak(expected_part, new_part, cuda::memory_order_relaxed));

old_parts[i] = expected_part;
carry = calculate_carry_64(expected_part, current_add, carry);
});

__uint128_t const old_val_unsigned =
(static_cast<__uint128_t>(old_parts[1]) << 64) | old_parts[0];
return static_cast<__int128_t>(old_val_unsigned);
#endif
}

} // namespace detail
} // namespace cudf
14 changes: 6 additions & 8 deletions cpp/src/aggregation/aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,15 @@ struct identity_initializer {
requires(is_supported<T, k>())
{
if constexpr (k == aggregation::SUM_WITH_OVERFLOW) {
// SUM_WITH_OVERFLOW uses a struct with sum (int64_t) and overflow (bool) children
// Initialize sum child to 0 and overflow child to false
// SUM_WITH_OVERFLOW uses a struct with sum and overflow children
auto sum_col = col.child(0);
auto overflow_col = col.child(1);

auto zip_begin = thrust::make_zip_iterator(
thrust::make_tuple(sum_col.begin<int64_t>(), overflow_col.begin<bool>()));
thrust::fill(rmm::exec_policy_nosync(stream),
zip_begin,
zip_begin + col.size(),
thrust::make_tuple(int64_t{0}, false));
// Initialize sum column using standard SUM aggregation dispatch
dispatch_type_and_aggregation(
sum_col.type(), aggregation::SUM, identity_initializer{}, sum_col, stream);
thrust::uninitialized_fill_n(
rmm::exec_policy_nosync(stream), overflow_col.begin<bool>(), col.size(), false);
} else if constexpr (std::is_same_v<T, cudf::struct_view>) {
// This should only happen for SUM_WITH_OVERFLOW, but handle it just in case
CUDF_FAIL("Struct columns are only supported for SUM_WITH_OVERFLOW aggregation");
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/groupby/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ struct empty_column_constructor {
if constexpr (k == aggregation::Kind::MERGE_HISTOGRAM) { return empty_like(values); }

if constexpr (k == aggregation::Kind::SUM_WITH_OVERFLOW) {
// SUM_WITH_OVERFLOW returns a struct with sum (int64_t) and overflow (bool) children
// SUM_WITH_OVERFLOW returns a struct with sum (same type as input) and overflow (bool)
// children
std::vector<std::unique_ptr<cudf::column>> children;
children.push_back(make_empty_column(cudf::data_type{cudf::type_id::INT64}));
children.push_back(make_empty_column(values.type()));
children.push_back(make_empty_column(cudf::data_type{cudf::type_id::BOOL8}));
return make_structs_column(0, std::move(children), 0, {}, stream, mr);
}
Expand Down Expand Up @@ -212,6 +213,18 @@ void verify_valid_requests(host_span<RequestType const> requests)
});
}),
"Invalid type/aggregation combination.");

// Additional validation for SUM_WITH_OVERFLOW: only signed integers and decimals are supported
for (auto const& request : requests) {
for (auto const& agg : request.aggregations) {
if (agg->kind == aggregation::SUM_WITH_OVERFLOW) {
CUDF_EXPECTS(
cudf::detail::is_valid_aggregation(request.values.type(), aggregation::SUM_WITH_OVERFLOW),
"SUM_WITH_OVERFLOW aggregation only supports signed integer types and decimal types. "
"Unsigned integers, bool, dictionary columns, and other types are not supported.");
}
}
}
}

} // namespace
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/groupby/hash/output_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ struct result_column_creator {
return make_fixed_width_column(data_type{type_id}, size, mask_state, stream, mr);
};

auto make_children = [&make_empty_column](size_type size) {
auto make_children = [&make_empty_column, col_type = col.type()](size_type size) {
std::vector<std::unique_ptr<column>> children;
// Create sum child column (int64_t) - no null mask needed, struct-level mask handles
// nullability
children.push_back(make_empty_column(type_id::INT64, size, mask_state::UNALLOCATED));
children.push_back(make_empty_column(col_type.id(), size, mask_state::UNALLOCATED));
// Create overflow child column (bool) - no null mask needed, only value matters
children.push_back(make_empty_column(type_id::BOOL8, size, mask_state::UNALLOCATED));
return children;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,11 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::sort
auto store_functor =
detail::aggregate_result_functor(request.values, helper(), cache, stream, mr);
for (auto const& agg : request.aggregations) {
// SUM_WITH_OVERFLOW is only supported with hash-based groupby, not sort-based
CUDF_EXPECTS(agg->kind != aggregation::SUM_WITH_OVERFLOW,
"SUM_WITH_OVERFLOW aggregation is only supported with hash-based groupby, not "
"sort-based groupby");

// TODO (dm): single pass compute all supported reductions
cudf::detail::aggregation_dispatcher(agg->kind, store_functor, *agg);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ ConfigureTest(
groupby/sum_of_squares_tests.cpp
groupby/sum_scan_tests.cpp
groupby/sum_tests.cpp
groupby/sum_with_overflow_tests.cpp
groupby/tdigest_tests.cpp
groupby/var_tests.cpp
GPUS 1
Expand Down
49 changes: 48 additions & 1 deletion cpp/tests/device_atomics/device_atomics_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/traits.hpp>
#include <cudf/wrappers/timestamps.hpp>

#include <thrust/host_vector.h>
#include <rmm/device_scalar.hpp>

#include <cuda/std/limits>

#include <algorithm>

Expand Down Expand Up @@ -261,4 +264,48 @@ TYPED_TEST(AtomicsTest, atomicCASRandom)
this->atomic_test(input_array, is_cas_test, block_size, grid_size);
}

__global__ void test_single_atomic_add_kernel(__int128_t* target, __int128_t value)
{
cudf::detail::atomic_add(target, value);
}

class Atomic128Test : public cudf::test::BaseFixture {
public:
void run_atomic_add_test(__int128_t initial_value,
__int128_t add_value,
__int128_t expected_result)
{
rmm::device_scalar<__int128_t> d_target(initial_value, cudf::get_default_stream());
test_single_atomic_add_kernel<<<32, 256, 0, cudf::get_default_stream().value()>>>(
d_target.data(), add_value);
CUDF_CHECK_CUDA(cudf::get_default_stream().value());
__int128_t result = d_target.value(cudf::get_default_stream());
EXPECT_EQ(result, expected_result);
}
};

TEST_F(Atomic128Test, BasicAddition) { run_atomic_add_test(0, 1, 32 * 256 * 1); }

TEST_F(Atomic128Test, CarryPropagation)
{
constexpr int total_threads = 32 * 256;
__int128_t initial_value = cuda::std::numeric_limits<int64_t>::max() - total_threads + 1;
__int128_t expected = static_cast<__int128_t>(cuda::std::numeric_limits<int64_t>::max()) + 1;
run_atomic_add_test(initial_value, 1, expected);
}

TEST_F(Atomic128Test, NegativeNumbers)
{
constexpr int total_threads = 32 * 256;
run_atomic_add_test(0, -50, -total_threads * 50);
}

TEST_F(Atomic128Test, NegativeCarryPropagation)
{
constexpr int total_threads = 32 * 256;
__int128_t initial_value = cuda::std::numeric_limits<int64_t>::min() + total_threads - 1;
__int128_t expected = static_cast<__int128_t>(cuda::std::numeric_limits<int64_t>::min()) - 1;
run_atomic_add_test(initial_value, -1, expected);
}

CUDF_TEST_PROGRAM_MAIN()
Loading
Loading