-
Notifications
You must be signed in to change notification settings - Fork 3.8k
chore: Refactor dataobj index builder to handle buffered events from stale partitions #19189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
💻 Deploy preview available: |
2bd1e56
to
a5632c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this - it looks good apart from a couple of questions.
I had a thought about the approach: Would it be simpler to use a mutex or semaphore within buildIndex? That way you don't need to coordinate across goroutines and dispatch work, each entrypoint could just call buildIndex directly and wait for it complete. I may be missing some nuance, however!
pkg/dataobj/index/builder.go
Outdated
switch tt { | ||
case triggerTypeAppend: | ||
return "append" | ||
case triggerTypeFlush: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe this should be "max-age" or something instead of flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that the config is called max-idle-time
we can say triggerTypeMaxIdle
is the winner?
processingErrors.Add(fmt.Errorf("failed to download object: %w", obj.err)) | ||
continue | ||
} | ||
p.wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does p.wg.Wait() ever get called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes p.wg.Wait()
is called in stopping()
. AFAICT my analysis is that this is ok:
- Add(1) calls are called for the flush ticker routine and for each flush async partition routine
- Done is called when the flush ticker routine and for each flush async partition routine exit
- Wait is called in stopping waiting for all routines to end before closing the client.
Did I miss something here?
pkg/dataobj/index/builder.go
Outdated
return nil, nil | ||
} | ||
case triggerTypeFlush: | ||
if len(state.events) < p.cfg.MinFlushEvents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this condition & flag can be removed. If something is older than the MaxIdleTime, we need to flush it anyway even if it means it'll be a small index.
pkg/dataobj/index/indexer.go
Outdated
} | ||
|
||
// Extract records for committing | ||
records := make([]*kgo.Record, len(req.events)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very minor performance optimization, but records isn't used unless the build was successful so you could do this after the error check later
pkg/dataobj/index/indexer.go
Outdated
// Successfully sent event for download | ||
case <-ctx.Done(): | ||
return "", ctx.Err() | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this default case needed?
If the channel is closed, the context should already have been cancelled and would be caught at the start of the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right for a moment a test was failing and that was a fix but never got back to review this properly.
pkg/dataobj/index/builder.go
Outdated
} | ||
|
||
func (p *Builder) cleanupPartition(partition int32) { | ||
p.partitionsMutex.Lock() | ||
defer p.partitionsMutex.Unlock() | ||
|
||
p.cancelActiveCalculation(nil) | ||
// Cancel active calculation for this partition | ||
p.calculationsMutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think calculations mutex is always acquired under the partitionsMutex - are they both needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right! The calculationMutex was in play because I use it also in stopping which does not rely on the partitionMutex. However now that the context propagation is refactor to pass through functions we can rely on the golang pattern let context cancelation drive cleanup.
Practically yes you are right. However I decided to use channels for the buildWorker to stay similar to the downloadWorker. For now the limitation to keep only one buildWorker is CPU usage, but we may lift this and add more workers later. WDYT? edit: one more thing that came into my mind when I built this is that downloading/processing are two independent queues so we could better observe where things go wrong/slow later. |
e9a7737
to
2a6babd
Compare
What this PR does / why we need it:
This pull request is a medium sized refactoring of the dataobj index builder to support handling stale partitions in terms of buffered events per index but less than
Config.EventsPerIndex
. In particular:indexer.go
that the builder is feeding via a golang channel.Which issue(s) this PR fixes:
Fixes grafana/loki-private#1967
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR