Skip to content

Optimizations for tdigest generation. #19140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: branch-25.08
Choose a base branch
from

Conversation

nvdbaranec
Copy link
Contributor

@nvdbaranec nvdbaranec commented Jun 11, 2025

This PR implements optimizations for speeding up a specific part of the tdigest generation code. Tdigest generation happens in two parts:

  • Based on the input size and the user requested fidelity, a set of segments that represent "buckets" of input values to be merged together into centroids is generated.
  • The input values are then merged together via a segmented reduction into these buckets.

The issue lies with the first step. The process of choosing the buckets is inherently linear per input group. It uses a nonlinear 'scale function' to determine where the next bucket cutoff point should be based on where you currently are. The current implementation uses 1 thread per input group. The use case here involves only 1 group, with a large number of values. Broadly, ~5,000,000 input values, using a max of 10,000 segments, resulting in ~5000 actual segments being generated. The computation of those 5000 segments is the `issue.

As implemented today, it takes roughly 6.5ms to generate that on an A5000 GPU.

There were 3 optimizations pursued:

  • First optimization: Use the CPU to do this particular step for small numbers of groups. The CPU is considerably faster here per group. A single core of a Threadripper taking roughly 250us to do the same work. This optimization works as follows: If we have up to 64 groups of work, run 32 of them on the CPU and the remainder on the GPU. 32 is the rough break-even point where they take about the same amount of time. Above 64 groups, run it all on the GPU because the inherent parallelism starts to dominate, and we don't have to do a device->pinned copy stop to make the data available to the CPU. Since our specific customer case involves one group, this drops the processing time for the section from 6.5ms to 230us. Below we can see the case (highlighted in green) where the CPU and GPU are overlapping the same amount of work (32 groups CPU and 32 groups GPU).

image

  • Second optimization. As with many GPU algorithms, we have to run this computation twice. Once to determine the size the outputs, and once to actually fill them in. So we're doubling our cost. However, we can make some reasonable assumptions about how big the worst case size will be. It is tied to the cluster limit (10,000 specified by the user). So instead of exactly computing the number of clusters per group in a separate pass, we can just allocate the worst case and do the whole process in one. This number can in theory be large though. For example, 2000 groups * 10000 clusters * 8 bytes : 160MB. So I chose a somewhat arbitrary cap of 256 MB. If it is going to require more than that, we just do the two passes. It might be worth making this configurable via en environment variable.

  • Third optimization. A big part of the slowness is related to using doubles with some trig functions (sin and asin). Switching to floats brings the time down by about 40%. However, it caused some accuracy errors that I couldn't easily fix by just adjusting our error tolerance without getting unreasonable (in ulps). But, an easy reformulation of the math was pointed out to me which allows us to remove the sin and asin altogether, instead just costing a (double) sqrt per call. This results in about a 50% speedup for the function.

I added a benchmark specific to tdigest reductions (we needed it anyway), with emphasis on this specific case: small numbers of large groups. Speedups over the existing code are significant:

Old:

num_groups rows_per_group max_centroids Samples CPU Time Noise GPU Time Noise
1 5000000 10000 37x 13.609 ms 0.02% 13.604 ms 0.02%
16 5000000 10000 20x 25.281 ms 0.03% 25.276 ms 0.03%
64 5000000 10000 11x 63.460 ms 0.03% 63.455 ms 0.03%
1 1000000 10000 39x 13.018 ms 0.03% 13.013 ms 0.03%
16 1000000 10000 34x 15.139 ms 0.04% 15.134 ms 0.04%
64 1000000 10000 23x 22.695 ms 0.04% 22.690 ms 0.04%
1 5000000 1000 233x 2.158 ms 0.09% 2.154 ms 0.09%
16 5000000 1000 36x 14.238 ms 0.03% 14.233 ms 0.03%
64 5000000 1000 11x 52.956 ms 0.01% 52.950 ms 0.01%
1 1000000 1000 335x 1.500 ms 0.46% 1.496 ms 0.13%
16 1000000 1000 133x 3.791 ms 0.09% 3.786 ms 0.09%
64 1000000 1000 45x 11.170 ms 0.04% 11.165 ms 0.04%

New:

num_groups rows_per_group max_centroids Samples CPU Time Noise GPU Time Noise
1 5000000 10000 464x 1.083 ms 2.64% 1.079 ms 2.65%
16 5000000 10000 560x 15.280 ms 1.35% 15.276 ms 1.35%
64 5000000 10000 11x 58.353 ms 0.12% 58.349 ms 0.12%
1 1000000 10000 1392x 365.365 us 1.09% 361.544 us 1.11%
16 1000000 10000 122x 4.120 ms 0.44% 4.117 ms 0.44%
64 1000000 10000 37x 13.577 ms 0.09% 13.573 ms 0.09%
1 5000000 1000 528x 1.014 ms 5.87% 1.010 ms 5.90%
16 5000000 1000 832x 14.139 ms 5.03% 14.135 ms 5.03%
64 5000000 1000 11x 54.931 ms 0.01% 54.926 ms 0.01%
1 1000000 1000 2960x 296.354 us 2.36% 292.453 us 2.25%
16 1000000 1000 560x 2.929 ms 3.19% 2.925 ms 3.20%
64 1000000 1000 45x 11.134 ms 0.11% 11.130 ms 0.11%

This yields as much as a 10x speedup. You can see where the time starts to flatten out at 64 groups, where the parallelism of the GPU starts to win.

Existing (merge aggregation) benchmarks saw some wins too

Old

num_tdigests tdigest_size tdigests_per_group max_centroids Samples CPU Time Noise GPU Time Noise Elem/s
500000 1 1 10000 1072x 472.379 us 3.17% 467.688 us 3.19% 1.069G
500000 1000 1 10000 11x 783.576 ms 0.17% 783.563 ms 0.17% 638.110M
500000 1 1 1000 2528x 463.077 us 0.73% 458.498 us 0.74% 1.091G
500000 1000 1 1000 11x 540.151 ms 0.13% 540.140 ms 0.13% 925.686M

New

num_tdigests tdigest_size tdigests_per_group max_centroids Samples CPU Time Noise GPU Time Noise Elem/s
500000 1 1 10000 1248x 404.618 us 2.50% 400.754 us 2.52% 1.248G
500000 1000 1 10000 11x 765.092 ms 0.12% 765.079 ms 0.12% 653.527M
500000 1 1 1000 2208x 399.257 us 0.92% 395.360 us 0.93% 1.265G
500000 1000 1 1000 11x 532.907 ms 0.24% 532.895 ms 0.24% 938.271M

Leaving as a draft for now to get some specific Spark testing done with it.

- A cpu path for cases where we have small numbers of potentially large inputs
- Use floats instead of doubles to run trig functions in the scale function
- When appropriate, use a worst-case memory allocation strategy to cut the number of cluster computing kernel calls in half.
…back from pinned to device in the CPU or partial CPU case. Changed tests so that

they forcibly run in non-CPU mode as well as CPU mode.
Copy link

copy-pr-bot bot commented Jun 11, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions bot added the libcudf Affects libcudf (C++/CUDA) code. label Jun 11, 2025
… lot of work in the scale function. Nets about a 40% win

in the function itself.
… numbers of large groups to test a customer use case.
@mythrocks mythrocks added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Jun 16, 2025
@revans2
Copy link
Contributor

revans2 commented Jun 17, 2025

The changes look good to me. I ran some tests.

val dfs = (0 until 9).map(i => math.pow(10, i).toLong).map{ rows => 
 spark.range(rows).selectExpr(s"rand($rows) as rn").selectExpr("percentile(rn, array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99)) as p", "percentile_approx(rn, array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99)) as pa").selectExpr("p", "pa", "abs(pa[0] - p[0]) as e0", "abs(pa[1] - p[1]) as e1", "abs(pa[2] - p[2]) as e2", "abs(pa[3] - p[3]) as e3", "abs(pa[4] - p[4]) as e4", "abs(pa[5] - p[5]) as e5", "abs(pa[6] - p[6]) as e6", "abs(pa[7] - p[7]) as e7", "abs(pa[8] - p[8]) as e8","abs(pa[9] - p[9]) as e9","abs(pa[10] - p[10]) as e10", s"$rows as rc")
}
spark.conf.set("spark.rapids.sql.enabled",false)
dfs.reduceLeft((a, b) => a.unionAll(b)).repartition(1).sortWithinPartitions("rc").write.mode("overwrite").parquet("/data/tmp/CPU_NEW_RESULT")
spark.conf.set("spark.rapids.sql.enabled",true)
dfs.reduceLeft((a, b) => a.unionAll(b)).repartition(1).sortWithinPartitions("rc").write.mode("overwrite").parquet("/data/tmp/GPU_NEW_RESULT")

val cpu = spark.read.parquet("/data/tmp/CPU_NEW_RESULT")
val gpu = spark.read.parquet("/data/tmp/GPU_NEW_RESULT")

cpu.alias("cpu").join(gpu.alias("gpu"), cpu("rc") === gpu("rc")).select(col("cpu.rc"), (col("cpu.e0") >= col("gpu.e0")).alias("e0"), (col("cpu.e1") >= col("gpu.e1")).alias("e1"), (col("cpu.e2") >= col("gpu.e2")).alias("e2"), (col("cpu.e3") >= col("gpu.e3")).alias("e3"), (col("cpu.e4") >= col("gpu.e4")).alias("e4"), (col("cpu.e5") >= col("gpu.e5")).alias("e5"), (col("cpu.e6") >= col("gpu.e6")).alias("e6"), (col("cpu.e7") >= col("gpu.e7")).alias("e7"), (col("cpu.e8") >= col("gpu.e8")).alias("e8"), (col("cpu.e9") >= col("gpu.e9")).alias("e9"), (col("cpu.e10") >= col("gpu.e10")).alias("e10")).show()

Effectively I am calculating the difference between the actual percentile and the approximate version for both the CPU and the GPU, so that we can see if the GPU is a better, or worse estimate than the CPU is. For all of the tests that I ran the GPU is as good as or better than the CPU both before this change and after it.

+---------+----+----+----+----+----+----+----+----+----+-----+----+
|       rc|  e0|  e1|  e2|  e3|  e4|  e5|  e6|  e7|  e8|   e9| e10|
+---------+----+----+----+----+----+----+----+----+----+-----+----+
|        1|true|true|true|true|true|true|true|true|true| true|true|
|       10|true|true|true|true|true|true|true|true|true| true|true|
|      100|true|true|true|true|true|true|true|true|true| true|true|
|     1000|true|true|true|true|true|true|true|true|true| true|true|
|    10000|true|true|true|true|true|true|true|true|true| true|true|
|   100000|true|true|true|true|true|true|true|true|true| true|true|
|  1000000|true|true|true|true|true|true|true|true|true| true|true|
| 10000000|true|true|true|true|true|true|true|true|true| true|true|
|100000000|true|true|true|true|true|true|true|true|true|false|true|
+---------+----+----+----+----+----+----+----+----+----+-----+----+

That said if I compare the new GPU calculation to the old GPU calculation we differ only on the 100,000,000 rows version. In one case we are slightly better, e0, and it all of the other cases we are slightly worse. But it still beats the CPU for accuracy, so it is good for me.

@nvdbaranec nvdbaranec marked this pull request as ready for review June 17, 2025 19:59
@nvdbaranec nvdbaranec requested a review from a team as a code owner June 17, 2025 19:59
@nvdbaranec nvdbaranec requested review from shrshi and mhaseeb123 June 17, 2025 19:59
@ttnghia
Copy link
Contributor

ttnghia commented Jun 17, 2025

Instead of showing old vs new benchmark, can you run the comparing script please? Using https://github.com/NVIDIA/nvbench/blob/main/scripts/nvbench_compare.py.

Comment on lines +123 to +152
auto col = cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT32}, num_rows, cudf::mask_state::UNALLOCATED, stream, mr);
thrust::sequence(rmm::exec_policy_nosync(stream),
col->mutable_view().begin<int>(),
col->mutable_view().end<int>(),
0);

// group offsets, labels, valid counts
auto iter = thrust::make_counting_iterator(0);
rmm::device_uvector<cudf::size_type> group_offsets(num_groups + 1, stream, mr);
thrust::transform(rmm::exec_policy_nosync(stream),
iter,
iter + group_offsets.size(),
group_offsets.begin(),
cuda::proclaim_return_type<int>(
[rows_per_group] __device__(int i) { return i * rows_per_group; }));
rmm::device_uvector<cudf::size_type> group_labels(num_rows, stream, mr);
thrust::transform(rmm::exec_policy_nosync(stream),
iter,
iter + num_rows,
group_labels.begin(),
cuda::proclaim_return_type<int>(
[rows_per_group] __device__(int i) { return i / rows_per_group; }));
rmm::device_uvector<cudf::size_type> group_valid_counts(num_groups, stream, mr);
auto valid_count_iter = thrust::make_constant_iterator(rows_per_group);
thrust::copy(rmm::exec_policy_nosync(stream),
valid_count_iter,
valid_count_iter + group_valid_counts.size(),
group_valid_counts.begin());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all of this can be done with cudf APIs which would be a bit shorter but also mean this file could probably be just a .cpp file instead of .cu file.
Here is my attempt:

  auto zero = cudf::numeric_scalar<cudf::size_type>(0);
  auto col  = cudf::sequence(num_rows, zero);

  auto rows_per_group = num_rows / num_groups;
  auto rpg_scalar     = cudf::numeric_scalar<cudf::size_type>(rows_per_group);
  auto group_offsets  = cudf::sequence(num_groups + 1, zero, rpg_scalar);

  auto rpt_tbl = cudf::repeat(cudf::table_view({cudf::slice(col->view(), {0, num_groups}).front()}),
                 rows_per_group)->release();
  auto group_labels       = std::move(rpt_tbl.front());
  auto group_valid_counts = cudf::sequence(num_groups, rpg_scalar, zero);

Copy link
Member

@mhaseeb123 mhaseeb123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial flush. We should use cuda::std::XX in device and host device code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improvement / enhancement to an existing function libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants