-
Notifications
You must be signed in to change notification settings - Fork 982
Preprocessing offsets for Parquet non-dictionary string columns #20430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
mhaseeb123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some very early comments
mhaseeb123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing some more early comments from the first pass.
| ? 0 | ||
| : next_offset - input_thread_string_offset - 4; | ||
| } | ||
| if (input_thread_string_offset >= (uint32_t)s->dict_size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a valid case? Should we set error_code here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is lifted from gpuGetStringData(). I believe it is impossible to occur, but I'm trying to keep the same behavior as before.
@pmattione-nvidia |
|
I haven't read through the PR yet, but I'm assuming these preprocessed offsets are stored in global memory? This seems like it could become a pretty big number. We might need to take this into account in the chunked reader memory limit computation. |
|
Part of me wonders here: if we're already prefetching everything into shared memory here to get the lengths, how hard would it be to also just do the rest of the work (copying to output) right here? Seems like we're going to end up touching every byte in the input twice otherwise. I guess the hard part is you don't know where your particular page's strings start in the output. Hmm... Edit: Actually, I think we do know this. We already compute all the string sizes per-page during the setup. So we could scan that to get the start output for each pages first string. Then as the warp(s) march through the precached buffers, it's easy enough to know where each prefetched piece goes. |
|
Note: I'm not suggesting we change this PR. But it's something to think about maybe for later. We'd still have to run the decode kernel to generate validity/list info in some cases, but it seems like this might be a way to get a useful overall speedup. |
| // Allocate the string offset buffer | ||
| _string_offset_buffer = rmm::device_uvector<uint32_t>(total_num_offsets, _stream, _mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is the part where we might need to take this into account for the input chunk limiting size. If we had 50 million rows, that'd be 256MB, which would be a significant chunk of the typical memory limit set by spark for temporary mem usage. I think an estimate (maybe worst case, pretending there's no nulls) might be ok.
Here's where we add in the temp memory usage for decompression
| // include scratch space needed for decompression. for certain codecs (eg ZSTD) this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
nvdbaranec
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. Didn't mean to approve.
The problem with doing this in the main decode kernel is that we have a lot of shared memory taken up by all of the random decode buffers AND the string decode buffers (so that all of the threads in the block can cooperatively copy the strings). In fact, we use so much shared memory for string decode that it is the limiting factor on the occupancy, not the register usage. We're better off having this as a separate kernel where we can have a larger buffer without hurting occupancy. Plus, reading the lengths takes 6x as long as the rest of the decode kernel combined, so hitting the memory again later doesn't take as long (probably because we don't need to be single threaded there). |
|
CC: @vuule for viz only |
mhaseeb123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flush comments so far
For non-dictionary string columns, the strings are laid out in memory as: 4-byte length, string data ..., 4-byte length, string data, ... With this layout we cannot read the lengths in parallel, as we don't know where in global memory the next length value is without knowing the previous value. This causes the dominant bottleneck for non-dictionary, non-Fixed-Length-Byte-Array (all string lengths are the same) string columns, taking six times as long as the rest of the decode.
This PR moves the reading of the string lengths out of the main decode kernel and into a preprocessing kernel. In this kernel we allocate a small buffer of shared memory that all threads in the warp can cooperatively prefetch into from global memory. Then when thread 0 extracts the string lengths, it is doing so from shared memory instead of global memory so we spend much less time bottlenecked on a single thread.
However if the average string length is large, prefetching a buffer into shared memory is slower. If only a few strings fit into the buffer, then we spend too much time prefetching data that we mostly don't need. Benchmarking showed that this performance dropoff was around fitting 16 length-64 strings into the 1KB shared memory buffer. Thus above this cutoff we revert to a fully single-threaded, non-buffering algorithm. For strings lengths 128+ there was no performance change.
There are several advantages to preprocessing instead of doing this in the main decode kernel. First of all, by having only one warp per page instead of 4 warps per page in the decode kernel, we can go much wider, and not so many threads are bottlenecked. But primarily we have significantly less shared memory and register pressure, and can safely throw a lot more memory at the prefetch buffer without reducing occupancy.
The performance gains:
I attempted double-buffering the data, by having one warps filling half of the prefetch buffer while thread 0 on the other warp read from the other half, but this was slower than single-buffering. Comparing, with two warps (64 threads) and 2kb of shared memory you can do either:
The increased speed from single buffering is because the 62 threads are probably not waiting very long for the thread 0's to extract the lengths, so mostly all 64 threads prefetch, instead of just 32 in the double-buffering scheme.
Instead of saving both string lengths and offsets as before, only offsets are saved now, as the string lengths can be extracted from them. Also, the string lengths of for fixed-length-byte-array columns are known from the column data header, so they aren't saved at all. Finally, by doing this preprocessing, I was able to free some shared memory from the main strings decode kernel due to buffers no longer being needed.
Checklist