-
Notifications
You must be signed in to change notification settings - Fork 882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor vectorized grouping to prepare for hash grouping #7408
base: main
Are you sure you want to change the base?
Conversation
This commit has various assorted refactorings and cosmetic changes: * Various cosmetic things I don't know where to put. * The definitions of aggregate functions and grouping columns in the vector agg node are now typed arrays and not lists. * The aggegate function implementation always work with at most one filter bitmap. This reduces the amount of code and will help to support the aggregate FILTER clauses. * Parts of the aggregate function implementations are restructured and renamed in a way that will make it easier to support hash grouping. * EXPLAIN output is added for vector agg node that mentions the grouping policy that is being used.
@erimatnor, @svenklemm: please review this pull request.
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7408 +/- ##
==========================================
+ Coverage 80.06% 82.48% +2.41%
==========================================
Files 190 228 +38
Lines 37181 42631 +5450
Branches 9450 10702 +1252
==========================================
+ Hits 29770 35164 +5394
- Misses 2997 3184 +187
+ Partials 4414 4283 -131 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is going in the right direction. I think there might be an issue with the agg init function handling though. See inline comments.
Look it over, and check if I am right about that or not.
Otherwise, I also have some suggestion for improving the clarity of the VectorAgg functions interface. Again, see inline comments. I think you can decide yourself which direction you want to go, I am just suggesting and pointing out potential issues.
Look through my comments and after addressing I think this would be ready.
*/ | ||
int agg_functions_counter = 0; | ||
int grouping_column_counter = 0; | ||
for (int i = 0; i < tlist_length; i++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for not using a foreach
loop here? I don't the see the index i
being used for anything except getting the list elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to cosmetically match the same loop below where i
is required.
{ | ||
FUNCTION_NAME(state) *state = (FUNCTION_NAME(state) *) agg_state; | ||
*state = (FUNCTION_NAME(state)){ 0 }; | ||
FUNCTION_NAME(state) *states = (FUNCTION_NAME(state) *) agg_states; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason this can't also do the allocation of the array and return it? n
is given, so should be possible. And, internally the type and size of the type is known.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
init
function is not supposed to allocate memory always, it also initializes the aggregate state in the already allocated uninitialize memory (after grouping policy is reset).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having separate alloc and init is fine, but then why not have an alloc/create function in the interface, which is internally alloc+init? It seems like there's a disconnect when separating the operations on the state from knowledge about its structure and size. These functions should be able to just know the size and internal state without the caller having to tell it via extra parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that the aggregate states of a single function for many keys are laid out contiguously in memory is an architectural assumption that is relied on by both the hash grouping policy and the aggregate function handler. There's no way to move it completely into one of them and make the other agnostic.
Technically I could move the allocation into some wrapper around aggregate function handler, that is common for all of them, but this doesn't achieve any better separation of concerns, so all this extra code would be there for no reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quoting a comment from my drafts:
/*
* Hash grouping policy.
*
* The grouping and aggregation is performed as follows:
*
* 0) The grouping policy keeps track of the unique grouping keys seen in
* the input rows, and the states of aggregate functions for each key. This
* spans multiple input compressed batches, and is reset after the partial
* aggregation results are emitted.
*
* 1) For each row of the new compressed batch, we obtain an integer index that
* uniquely identifies its grouping key. This is done by matching the row's
* grouping columns to the hash table recording the unique grouping keys and
* their respective indexes. It is performed in bulk for all rows of the batch,
* to improve memory locality.
*
* 2) The key indexes are used to locate the aggregate function states
* corresponding to a given row, and update it. This is done in bulk for all
* rows of the batch, and for each aggregate function separately, to generate
* simpler and potentially vectorizable code, and improve memory locality.
*
* 3) After the input have ended, or if the memory limit is reached, the partial
* results are emitted into the output slot. This is done in the order of unique
* grouping key indexes, thereby preserving the incoming key order. This
* guarantees that this policy works correctly even in a Partial GroupAggregate
* node, even though it's not optimal performance-wise.
*/
The fact that you can use the key indexes that are the same for every aggregate function requires this memory layout. There's no point in trying to kinda "hide" this requirement by moving the code around, it's the basic assumption of the algorithm.
#include "agg_const_helper.c" | ||
static pg_attribute_always_inline void | ||
FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something about the test coverage here? Looks like dead code right now.
agg_def->func->agg_init(agg_state); | ||
VectorAggDef *agg_def = &policy->agg_defs[i]; | ||
void *agg_state = policy->agg_states[i]; | ||
agg_def->func.agg_init(agg_state, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really correct?
agg_init() is called in a loop over n
but most, if not all, agg_init()-functions now also loop over the agg_state array. Seems this would lead to writes outside the intended memory area, or am I missing something?
You should double-check this.
If you have the agg_state array and you know n
I think agg_init
can do all the initialization (including allocation of the array) internally. It seems safer than having it spread across multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should this really be agg_init(agg_state, 1) and not agg_init(agg_state, i)?
agg_def->func.agg_init(agg_state, 1); | |
agg_def->func.agg_init(agg_state, i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh, I get it now. This is kind of confusing.
You are first getting the offset to the individual agg_state element in an outer loop, then calling agg_init() on that element and then having it run an inner loop internally of 1 element. This is very confusing.
Why not just pass in the vector array to agg_init() and giving the right size without having the outer loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, because this is just a preparatory refactoring PR, this is only called with n = 1
, so might look confusing. But in the hash aggregation PR, it's going to be used to initialize the aggregate states in bulk. So I pass the pointer of the first state and the number of them. Is there some alternative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I still don't understand why you need this outer loop here, end then an inner loop in the init function that you are essentially not using.
Why not just replacing the outer loop here with a single call to agg_def->func.agg_init(policy->agg_states, naggs)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outer loop is looping over different aggregate functions, each of them has its own state. Like select count(*), count(x), sum(x) from table
, you have three VectorAggDef
s here and have to do something different for each.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I was unclear. I understand why the outer loop exists, but I don't understand why there's an inner loop that is not used because you are only ever calling agg_init
on the first entry in the the agg_state
array. For example, float_sum_init(), which I assume can be called via the agg_init
pointer is now a loop over n
values where n is hard-coded to 1 here instead of passing in the number of entries of the agg_state
array.
This is why I proposed the VectorAggState struct, which would have the number of entries allocated to the agg_state
array so that agg_init
would know how to initialize the entire array.
But I am probably missing something. But there is something that confuses me with this code, which might be an indication of something.
/* Initialize the aggregate function state pointed to by agg_value and agg_isnull. */ | ||
void (*agg_init)(void *agg_state); | ||
/* Initialize the aggregate function states. */ | ||
void (*agg_init)(void *restrict agg_states, int n); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see some potential confusion over the vector agg interface here:
- Some functions operate over the entire agg state array
- While other functions operate over a single element (relying on being passed a pointer to the right offset).
- It seems the semantics of some functions are changing (e.g.,
agg_init
seems to have gone from single element to array. And, I think there's maybe a bug related to this).
I think we should pick one approach or the other, or make it very clear from the function when it is single element or the entire array. Right now we just pass around the agg_state pointer and it is unclear whether it should point to the start of the array or a particular element.
IMO, I think we should probably mostly make this interface work over the entire array. (It is a vector interface after all). Let's make that explicit.
To that end, I am also wondering if we can make this struct/class self-contained to alleviate some of the above issues?
For example, why not have the agg_state
array and n
part of the struct instead of separating them? Then all agg functions will always take the struct pointer as first argument and it will always have access to the start of the array and the size. No ambiguity. If a particular element is to be accessed or updated, the function takes an offset arg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current concept is that aggregate function state is per-key. Then, there are functions for working with one state and many states in a contiguous array. The latter are not in this PR but in the hash aggregation one, except of the init
. They have a suffix many
and take different parameters. Functions for one state are required to generate more efficient code for full-batch aggregation, we could've done w/o them if this was less performance-sensitive.
Sounds like you want another thing that is "array of aggregate function states", that should combine an implementation of resizable array with an implementation of aggregate transition functions. Currently the "resizable array" part is done by a couple of lines in the hash grouping strategy, I think it's only going to complicate things if we somehow try to move these lines into a wrapper around every aggregate function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alloc and resize functions don't need to be a wrapper around every agg function. I am just arguing for putting things together that I think belong together for clarity instead of spread out across different parts of the code. It would make the vectoragg state stuff more into a reusable library. It is already, more or less, but it can be improved further, IMO.
What I mean is something like this:
typedef struct VectorAggState VectorAggState;
typedef struct
{
/* Size of the aggregate function state. */
size_t state_bytes;
/* Initialize the aggregate function states. */
void (*agg_init)(VectorAggState *state);
/* Aggregate a given arrow array. */
void (*agg_vector)(VectorAggState *state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx);
/* Aggregate a scalar value, like segmentby or column with default value. */
void (*agg_scalar)(VectorAggState *state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx);
/* Emit a partial aggregation result. */
void (*agg_emit)(VectorAggState *state, Datum *out_result, bool *out_isnull);
} VectorAggFunctions;
typedef struct VectorAggState
{
const VectorAggFunctions *funcs;
int num_states;
// Does it make sense to have a memory context here?
unsigned char states[FLEXIBLE_ARRAY_MEMBER];
} VectorAggState;
/* Initialize the aggregate function states. A resize function would be analogous */
VectorAggState *
vector_agg_state_create(const VectorAggFunctions *funcs, int num_states)
{
VectorAggState *state = palloc(sizeof(VectorAggState) + funcs->state_bytes * num_states);
state->funcs = funcs;
state->num_states = num_states;
funcs->agg_init(state);
return state;
}
#define GET_AGGSTATE(state, i) \
((state)->states + (state)->funcs->state_bytes * i)
static void
minmax_init(VectorAggState *state)
{
for (int i = 0; i < state->num_states; i++)
{
MinMaxState *mmstate = GET_AGGSTATE(state, i);
...
}
}
const VectorAggFunctions minmax_funcs = {
.state_bytes = sizeof(MinMaxState),
.agg_init = minmax_init,
...
};
The VectorAggState
struct always keeps the state array, the size, length, and functions together instead of being separate. Passing this pointer to functions is easier and more clear than having these things passed separately. It also makes it much less likely to make a mistake w.r.t. indexing, size, etc.
If you want to update or work with a particular agg state that is possible to (pseuo-code):
static void
minmax_update(VectorAggState *state, int pos, int value)
{
MinMaxState *mmstate = GET_AGGSTATE(state, pos);
mmstate->min = value;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like in this interface, the number of states of different functions can differ? This is not correct. The states of one function are stored contiguously in memory, but the indexes for the same key across different functions must be the same.
Let's continue this discussion after we have the hash grouping policy, the requirements will be more clear.
CountState *states = (CountState *) agg_states; | ||
for (int i = 0; i < n; i++) | ||
{ | ||
states[i].count = 0; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should we have to pass n
here? Wouldn't it be easier if n
(the number of elements) be part of the agg_state that gets passed in?
For example:
CountState *states = (CountState *) agg_states; | |
for (int i = 0; i < n; i++) | |
{ | |
states[i].count = 0; | |
} | |
static void | |
count_init(AggState *aggstate) | |
{ | |
CountState *states = (CountState *) aggstate->states; | |
for (int i = 0; i < aggstate->num_states; i++) | |
{ | |
states[i].count = 0; | |
} |
Or ideally, with allocation:
CountState *states = (CountState *) agg_states; | |
for (int i = 0; i < n; i++) | |
{ | |
states[i].count = 0; | |
} | |
static AggState * | |
count_create(int num_states) | |
{ | |
AggState *state = palloc0(sizeof(AggState) + sizeof(CountState) * num_states); | |
state->num_states = num_states; | |
// Below not really necessary with palloc0 | |
CountState *states = (CountState *) &aggstate->states[0]; | |
for (int i = 0; i < aggstate->num_states; i++) | |
{ | |
states[i].count = 0; | |
} | |
return state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n
is a number of separate aggregate states that are supposed to be initialized, e.g. when we've allocated new aggregate states for new 100 grouping keys and initialize them in bulk. It's not a part of a single aggregate state for a single grouping key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposed pattern can easily be updated to handle resize (see new suggestion is separate comment).
Co-authored-by: Erik Nordström <[email protected]> Signed-off-by: Alexander Kuzmenkov <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am approving this, although there are some things I still don't really understand and that look a bit odd to me. I commented on these things separately. Maybe it will become more clear when hash grouping is added and we can defer until then. Otherwise I don't see any obvious bugs, but there are some missing test coverage that would be nice to fix.
This PR prepares for #7341
It has various assorted refactorings and cosmetic changes:
No functional changes are expected except for the EXPLAIN output.
Disable-check: force-changelog-file