-
Notifications
You must be signed in to change notification settings - Fork 482
loki.write: implement sharding #4882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
💻 Deploy preview available (loki.write: implement sharding): |
4326897 to
6f05b9e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements queue-based sharding for the loki.write component, introducing a new architecture that distributes log entries across multiple parallel queues based on label fingerprints. The implementation unifies the handling of both normal and WAL-enabled clients through a shared shards structure, eliminating significant code duplication. The queue_config block, previously WAL-only, now applies to all endpoints and controls both queue capacity and shard count.
Key changes:
- Introduces
shards.gowith new sharding architecture for parallel processing via multiple queues - Refactors WAL and fanout clients to use the shared shards implementation, removing ~500 lines of duplicated code
- Adds
min_shardsconfiguration parameter to control parallelism level
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/component/loki/write/types.go | Adds MinShards field to QueueConfig and updates documentation to reflect queue config is now always used |
| internal/component/common/loki/client/shards.go | New file implementing the core sharding logic with queue management and parallel batch sending |
| internal/component/common/loki/client/shards_test.go | Comprehensive test coverage for queue operations including append, drain, and flush/shutdown scenarios |
| internal/component/common/loki/client/consumer_wal.go | Refactored to delegate batching and sending to the shards implementation, significantly simplified |
| internal/component/common/loki/client/consumer_fanout.go | Refactored to use shards implementation, removing duplicated send/batch logic |
| internal/component/common/loki/client/config.go | Adds MinShards field definition to QueueConfig struct |
| docs/sources/reference/components/loki/loki.write.md | Documents the new min_shards parameter and clarifies queue config usage |
thampiotr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we need to get a coherent story with naming established and then make sure it is reflected in docs and in the code. But we're on the right track now.
|
|
||
| ### `queue_config` | ||
|
|
||
| {{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="<ALLOY_VERSION>" >}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this continue to be experimental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we could keep this as experimental. But we would always use this config after this pr.
What would be considered experimental with it would be naming and changing defaults I guess.
|
|
||
| | Name | Type | Description | Default | Required | | ||
| | --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | | ||
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | | |
| | `capacity` | `string` | Controls the size of the underlying send queue buffer of each shard. Consider this setting as the worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | |
What does it even mean 'all enqueued batches are full'? Shouldn't it say that it's the total size of all the enqueued batches instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this was there before and I did not check / alter it.
But essentially whenever the capacity is full that means that the queue of batches is full and we cannot enqueue another one so we would block here until we get more capacity
| | --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | | ||
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | | ||
| | `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no | | ||
| | `min_shards` | `number` | Minimum amount of concurrent shards sending samples to the endpoint. | `1` | no | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider calling it something more intuitive than shards? I feel this term is just inherited from Prometheus 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this in slack, naming is fine because it would match what we use in prometheus.remote_write
|
|
||
| The optional `queue_config` block configures, when WAL is enabled, how the underlying client queues batches of logs sent to Loki. | ||
| Refer to [Write-Ahead block](#wal) for more information. | ||
| The optional `queue_config` block configures how the endpoint queues batches of logs sent to Loki. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call it queue_config, but we are actually configuring both sharding and queuing in one block.
Maybe this should be called something like parallelism in write.queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this in slack, naming is fine because it would match what we use in prometheus.remote_write
|
In ffb2bec I renamed the two clients implementation we have to endpoint and walEndpoint. I will look into the other ways we discussed structuring this but will have this as a fallback if that don't work out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.
Comments suppressed due to low confidence (1)
internal/component/common/loki/client/consumer_wal.go:135
sync.WaitGroupdoes not have aGomethod. The standard library'ssync.WaitGrouphasAdd(),Done(), andWait()methods. This should be:
stopWG.Add(1)
go func() {
defer stopWG.Done()
pair.Stop(drain)
}() stopWG.Go(func() {
pair.Stop(drain)
})
6132ec6 to
eaac392
Compare
|
@thampiotr I update the pr with an attempt to share stuff between non WAL and WAL implementation. So we now have one endpoint struct that will handle shards. This implemntation has one method We can use this directly in Fanout and we no longer need channels between fanout and endpoint. For WAL implementation I renamed it walEndpointAdapter. This implements the necessary interface that watcher expected and will just call enqueue on endpoint, that internally handles retries etc. Naming could be a bit off but this would be an option we can go with. Let me know what you think :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
internal/component/common/loki/client/consumer_wal.go:134
sync.WaitGroupdoes not have aGo()method. This code will not compile. You should either:
- Use
stopWG.Add(1)andgo func(p endpointWatcherPair) { defer stopWG.Done(); p.Stop(drain) }(pair)pattern (note: capturepairas parameter to avoid closure issues) - Or use
golang.org/x/sync/errgroupwhich has aGo()method
stopWG.Go(func() {
pair.Stop(drain)
})
0565e18 to
c889a4a
Compare
Add capability to perform sharding with loki.write
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
c889a4a to
c55de0c
Compare
queue. This would deadlock because we would not be able to drain and hard shutdown would not cancel it
|
Nothing really stands out for the small doc changes. It's good as-is. |
PR Description
This PR implements
queue_configfor theloki.writecomponent, enabling users to configure queue-based batching and parallel processing. The implementation introduces a new sharding architecture that distributes log entries across multiple parallel queues based on label fingerprints. This implementation is based on Prometheus rw sharding.The shards implementation is used with both "normal" clients and "WAL" clients. So we get rid of a lot of duplicated logic.
Before this pr we had a
queue_configblock that was only used whenWALwas enabled. It is now always used and will affect clients regardless.Currently no automatic "resharding" is implemented. Implementing this without the WAL will most likely be pretty primitive. So for now
min_shardsis the only configurable value until we address this.Ideally we would move a couple of attributes from
endpointblock toqueue_configblock to closer matchprometheus.remote_write. But we can't do that without doing a breaking change. These attributes are:retry_on_http_429max_backoff_periodmin_backoff_periodbatch_sizebatch_waitWhich issue(s) this PR fixes
Part of: #4728
Notes to the Reviewer
I moved wal writer ownership into client.Manager. No need to expose it to
loki.writecomponent.I plan to work on resharding in followup pr
PR Checklist