Skip to content

Commit 70b6341

Browse files
feat: add partition ring to dataobj consumer
1 parent 5b382fa commit 70b6341

File tree

5 files changed

+258
-147
lines changed

5 files changed

+258
-147
lines changed

docs/sources/shared/configuration.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,6 +1252,66 @@ dataobj:
12521252
# CLI flag: -dataobj-consumer.lifecycler.ID
12531253
[id: <string> | default = "<hostname>"]
12541254

1255+
partition_ring:
1256+
# The key-value store used to share the hash ring across multiple
1257+
# instances. This option needs be set on ingesters, distributors,
1258+
# queriers, and rulers when running in microservices mode.
1259+
kvstore:
1260+
# Backend storage to use for the ring. Supported values are: consul,
1261+
# etcd, inmemory, memberlist, multi.
1262+
# CLI flag: -dataobj-consumer.partition-ring.store
1263+
[store: <string> | default = "memberlist"]
1264+
1265+
# The prefix for the keys in the store. Should end with a /.
1266+
# CLI flag: -dataobj-consumer.partition-ring.prefix
1267+
[prefix: <string> | default = "collectors/"]
1268+
1269+
# Configuration for a Consul client. Only applies if the selected
1270+
# kvstore is consul.
1271+
# The CLI flags prefix for this block configuration is:
1272+
# dataobj-consumer.partition-ring
1273+
[consul: <consul>]
1274+
1275+
# Configuration for an ETCD v3 client. Only applies if the selected
1276+
# kvstore is etcd.
1277+
# The CLI flags prefix for this block configuration is:
1278+
# dataobj-consumer.partition-ring
1279+
[etcd: <etcd>]
1280+
1281+
multi:
1282+
# Primary backend storage used by multi-client.
1283+
# CLI flag: -dataobj-consumer.partition-ring.multi.primary
1284+
[primary: <string> | default = ""]
1285+
1286+
# Secondary backend storage used by multi-client.
1287+
# CLI flag: -dataobj-consumer.partition-ring.multi.secondary
1288+
[secondary: <string> | default = ""]
1289+
1290+
# Mirror writes to secondary store.
1291+
# CLI flag: -dataobj-consumer.partition-ring.multi.mirror-enabled
1292+
[mirror_enabled: <boolean> | default = false]
1293+
1294+
# Timeout for storing value to secondary store.
1295+
# CLI flag: -dataobj-consumer.partition-ring.multi.mirror-timeout
1296+
[mirror_timeout: <duration> | default = 2s]
1297+
1298+
# Minimum number of owners to wait before a PENDING partition gets
1299+
# switched to ACTIVE.
1300+
# CLI flag: -dataobj-consumer.partition-ring.min-partition-owners-count
1301+
[min_partition_owners_count: <int> | default = 1]
1302+
1303+
# How long the minimum number of owners are enforced before a PENDING
1304+
# partition gets switched to ACTIVE.
1305+
# CLI flag: -dataobj-consumer.partition-ring.min-partition-owners-duration
1306+
[min_partition_owners_duration: <duration> | default = 10s]
1307+
1308+
# How long to wait before an INACTIVE partition is eligible for deletion.
1309+
# The partition is deleted only if it has been in INACTIVE state for at
1310+
# least the configured duration and it has no owners registered. A value
1311+
# of 0 disables partitions deletion.
1312+
# CLI flag: -dataobj-consumer.partition-ring.delete-inactive-partition-after
1313+
[delete_inactive_partition_after: <duration> | default = 13h]
1314+
12551315
uploader:
12561316
# The size of the SHA prefix to use for generating object storage keys for
12571317
# data objects.
@@ -2824,6 +2884,7 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons
28242884
- `common.storage.ring`
28252885
- `compactor.ring`
28262886
- `dataobj-consumer`
2887+
- `dataobj-consumer.partition-ring`
28272888
- `distributor.ring`
28282889
- `index-gateway.ring`
28292890
- `ingest-limits`
@@ -3075,6 +3136,7 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et
30753136
- `common.storage.ring`
30763137
- `compactor.ring`
30773138
- `dataobj-consumer`
3139+
- `dataobj-consumer.partition-ring`
30783140
- `distributor.ring`
30793141
- `index-gateway.ring`
30803142
- `ingest-limits`
@@ -7498,6 +7560,7 @@ The TLS configuration. The supported CLI flags `<prefix>` used to reference this
74987560
- `compactor.grpc-client`
74997561
- `compactor.ring.etcd`
75007562
- `dataobj-consumer.etcd`
7563+
- `dataobj-consumer.partition-ring.etcd`
75017564
- `distributor.ring.etcd`
75027565
- `etcd`
75037566
- `frontend.grpc-client-config`

pkg/dataobj/consumer/config.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ import (
88

99
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
1010
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
11+
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
1112
util_log "github.com/grafana/loki/v3/pkg/util/log"
1213
)
1314

1415
type Config struct {
15-
BuilderConfig logsobj.BuilderConfig `yaml:"builder,omitempty"`
16-
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
17-
UploaderConfig uploader.Config `yaml:"uploader"`
18-
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
16+
BuilderConfig logsobj.BuilderConfig `yaml:"builder,omitempty"`
17+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
18+
PartitionRingConfig partitionring.Config `yaml:"partition_ring" category:"experimental"`
19+
UploaderConfig uploader.Config `yaml:"uploader"`
20+
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
1921
}
2022

2123
func (cfg *Config) Validate() error {
@@ -38,6 +40,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
3840
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
3941
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
4042
cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix, f, util_log.Logger)
43+
cfg.PartitionRingConfig.RegisterFlagsWithPrefix(prefix, f)
4144
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)
4245

4346
f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")

pkg/loki/config_wrapper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
379379
r.DataObj.Consumer.LifecyclerConfig.ListenPort = rc.ListenPort
380380
r.DataObj.Consumer.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
381381
r.DataObj.Consumer.LifecyclerConfig.EnableInet6 = rc.EnableIPv6
382+
r.DataObj.Consumer.PartitionRingConfig.KVStore = rc.KVStore
382383
}
383384
}
384385

pkg/loki/loki.go

Lines changed: 90 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -413,51 +413,53 @@ type Loki struct {
413413
deps map[string][]string
414414
SignalHandler *signals.Handler
415415

416-
Server *server.Server
417-
InternalServer *server.Server
418-
UI *ui.Service
419-
ring *ring.Ring
420-
Overrides limiter.CombinedLimits
421-
tenantConfigs *runtime.TenantConfigs
422-
TenantLimits validation.TenantLimits
423-
distributor *distributor.Distributor
424-
ingestLimits *limits.Service
425-
ingestLimitsRing *ring.Ring
426-
ingestLimitsFrontend *limits_frontend.Frontend
427-
ingestLimitsFrontendRing *ring.Ring
428-
Ingester ingester.Interface
429-
PatternIngester *pattern.Ingester
430-
PatternRingClient pattern.RingClient
431-
Querier querier.Querier
432-
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
433-
querierAPI *querier.QuerierAPI
434-
ingesterQuerier *querier.IngesterQuerier
435-
Store storage.Store
436-
BloomStore bloomshipper.Store
437-
bloomGatewayClient bloomgateway.Client
438-
tableManager *index.TableManager
439-
frontend Frontend
440-
ruler *base_ruler.Ruler
441-
ruleEvaluator ruler.Evaluator
442-
RulerStorage rulestore.RuleStore
443-
rulerAPI *base_ruler.API
444-
stopper queryrange.Stopper
445-
runtimeConfig *runtimeconfig.Manager
446-
MemberlistKV *memberlist.KVInitService
447-
compactor *compactor.Compactor
448-
QueryFrontEndMiddleware queryrangebase.Middleware
449-
queryScheduler *scheduler.Scheduler
450-
querySchedulerRingManager *lokiring.RingManager
451-
usageReport *analytics.Reporter
452-
indexGatewayRingManager *lokiring.RingManager
453-
PartitionRingWatcher *ring.PartitionRingWatcher
454-
partitionRing *ring.PartitionInstanceRing
455-
blockBuilder *blockbuilder.BlockBuilder
456-
blockScheduler *blockscheduler.BlockScheduler
457-
dataObjConsumer *consumer.Service
458-
dataObjConsumerRing *ring.Ring
459-
dataObjIndexBuilder *dataobjindex.Builder
460-
scratchStore scratch.Store
416+
Server *server.Server
417+
InternalServer *server.Server
418+
UI *ui.Service
419+
ring *ring.Ring
420+
Overrides limiter.CombinedLimits
421+
tenantConfigs *runtime.TenantConfigs
422+
TenantLimits validation.TenantLimits
423+
distributor *distributor.Distributor
424+
ingestLimits *limits.Service
425+
ingestLimitsRing *ring.Ring
426+
ingestLimitsFrontend *limits_frontend.Frontend
427+
ingestLimitsFrontendRing *ring.Ring
428+
Ingester ingester.Interface
429+
PatternIngester *pattern.Ingester
430+
PatternRingClient pattern.RingClient
431+
Querier querier.Querier
432+
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
433+
querierAPI *querier.QuerierAPI
434+
ingesterQuerier *querier.IngesterQuerier
435+
Store storage.Store
436+
BloomStore bloomshipper.Store
437+
bloomGatewayClient bloomgateway.Client
438+
tableManager *index.TableManager
439+
frontend Frontend
440+
ruler *base_ruler.Ruler
441+
ruleEvaluator ruler.Evaluator
442+
RulerStorage rulestore.RuleStore
443+
rulerAPI *base_ruler.API
444+
stopper queryrange.Stopper
445+
runtimeConfig *runtimeconfig.Manager
446+
MemberlistKV *memberlist.KVInitService
447+
compactor *compactor.Compactor
448+
QueryFrontEndMiddleware queryrangebase.Middleware
449+
queryScheduler *scheduler.Scheduler
450+
querySchedulerRingManager *lokiring.RingManager
451+
usageReport *analytics.Reporter
452+
indexGatewayRingManager *lokiring.RingManager
453+
PartitionRingWatcher *ring.PartitionRingWatcher
454+
partitionRing *ring.PartitionInstanceRing
455+
blockBuilder *blockbuilder.BlockBuilder
456+
blockScheduler *blockscheduler.BlockScheduler
457+
dataObjConsumer *consumer.Service
458+
dataObjConsumerRing *ring.Ring
459+
dataObjConsumerPartitionRing *ring.PartitionInstanceRing
460+
DataObjConsumerPartitionRingWatcher *ring.PartitionRingWatcher
461+
dataObjIndexBuilder *dataobjindex.Builder
462+
scratchStore scratch.Store
461463

462464
ClientMetrics storage.ClientMetrics
463465
deleteClientMetrics *deletion.DeleteRequestClientMetrics
@@ -800,6 +802,7 @@ func (t *Loki) setupModuleManager() error {
800802
mm.RegisterModule(DataObjExplorer, t.initDataObjExplorer)
801803
mm.RegisterModule(UI, t.initUI)
802804
mm.RegisterModule(DataObjConsumerRing, t.initDataObjConsumerRing)
805+
mm.RegisterModule(DataObjConsumerPartitionRing, t.initDataObjConsumerPartitionRing)
803806
mm.RegisterModule(DataObjConsumer, t.initDataObjConsumer)
804807
mm.RegisterModule(DataObjIndexBuilder, t.initDataObjIndexBuilder)
805808
mm.RegisterModule(ScratchStore, t.initScratchStore)
@@ -811,47 +814,48 @@ func (t *Loki) setupModuleManager() error {
811814

812815
// Add dependencies
813816
deps := map[string][]string{
814-
Ring: {RuntimeConfig, Server, MemberlistKV},
815-
Analytics: {},
816-
Overrides: {RuntimeConfig},
817-
OverridesExporter: {Overrides, Server, UI},
818-
TenantConfigs: {RuntimeConfig},
819-
UI: {Server},
820-
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, IngestLimitsFrontendRing, DataObjConsumerRing, UI},
821-
IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV},
822-
IngestLimits: {MemberlistKV, Overrides, Server},
823-
IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV},
824-
IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV},
825-
Store: {Overrides, IndexGatewayRing},
826-
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing, UI},
827-
Querier: {Store, Ring, Server, IngesterQuerier, PatternRingClient, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing, UI},
828-
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
829-
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing, UI},
830-
QueryScheduler: {Server, Overrides, MemberlistKV, Analytics, QuerySchedulerRing, UI},
831-
Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, Analytics, UI},
832-
RuleEvaluator: {Ring, Server, Store, IngesterQuerier, Overrides, TenantConfigs, Analytics},
833-
TableManager: {Server, Analytics, UI},
834-
Compactor: {Server, Overrides, MemberlistKV, Analytics, UI},
835-
IndexGateway: {Server, Store, BloomStore, IndexGatewayRing, IndexGatewayInterceptors, Analytics, UI},
836-
BloomGateway: {Server, BloomStore, Analytics, UI},
837-
BloomPlanner: {Server, BloomStore, Analytics, Store, UI},
838-
BloomBuilder: {Server, BloomStore, Analytics, Store, UI},
839-
BloomStore: {IndexGatewayRing, BloomGatewayClient},
840-
PatternRingClient: {Server, MemberlistKV, Analytics},
841-
PatternIngesterTee: {Server, Overrides, MemberlistKV, Analytics, PatternRingClient},
842-
PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee, Overrides, UI},
843-
IngesterQuerier: {Ring, PartitionRing, Overrides},
844-
QuerySchedulerRing: {Overrides, MemberlistKV},
845-
IndexGatewayRing: {Overrides, MemberlistKV},
846-
PartitionRing: {MemberlistKV, Server, Ring},
847-
MemberlistKV: {Server},
848-
BlockBuilder: {PartitionRing, Store, Server, UI},
849-
BlockScheduler: {Server, UI},
850-
DataObjExplorer: {Server, UI},
851-
DataObjConsumerRing: {RuntimeConfig, Server, MemberlistKV},
852-
DataObjConsumer: {MemberlistKV, ScratchStore, PartitionRing, Server, UI},
853-
DataObjIndexBuilder: {ScratchStore, Server, UI},
854-
ScratchStore: {},
817+
Ring: {RuntimeConfig, Server, MemberlistKV},
818+
Analytics: {},
819+
Overrides: {RuntimeConfig},
820+
OverridesExporter: {Overrides, Server, UI},
821+
TenantConfigs: {RuntimeConfig},
822+
UI: {Server},
823+
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, IngestLimitsFrontendRing, DataObjConsumerRing, UI},
824+
IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV},
825+
IngestLimits: {MemberlistKV, Overrides, Server},
826+
IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV},
827+
IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV},
828+
Store: {Overrides, IndexGatewayRing},
829+
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing, UI},
830+
Querier: {Store, Ring, Server, IngesterQuerier, PatternRingClient, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing, UI},
831+
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
832+
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing, UI},
833+
QueryScheduler: {Server, Overrides, MemberlistKV, Analytics, QuerySchedulerRing, UI},
834+
Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, Analytics, UI},
835+
RuleEvaluator: {Ring, Server, Store, IngesterQuerier, Overrides, TenantConfigs, Analytics},
836+
TableManager: {Server, Analytics, UI},
837+
Compactor: {Server, Overrides, MemberlistKV, Analytics, UI},
838+
IndexGateway: {Server, Store, BloomStore, IndexGatewayRing, IndexGatewayInterceptors, Analytics, UI},
839+
BloomGateway: {Server, BloomStore, Analytics, UI},
840+
BloomPlanner: {Server, BloomStore, Analytics, Store, UI},
841+
BloomBuilder: {Server, BloomStore, Analytics, Store, UI},
842+
BloomStore: {IndexGatewayRing, BloomGatewayClient},
843+
PatternRingClient: {Server, MemberlistKV, Analytics},
844+
PatternIngesterTee: {Server, Overrides, MemberlistKV, Analytics, PatternRingClient},
845+
PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee, Overrides, UI},
846+
IngesterQuerier: {Ring, PartitionRing, Overrides},
847+
QuerySchedulerRing: {Overrides, MemberlistKV},
848+
IndexGatewayRing: {Overrides, MemberlistKV},
849+
PartitionRing: {MemberlistKV, Server, Ring},
850+
MemberlistKV: {Server},
851+
BlockBuilder: {PartitionRing, Store, Server, UI},
852+
BlockScheduler: {Server, UI},
853+
DataObjExplorer: {Server, UI},
854+
DataObjConsumerRing: {RuntimeConfig, Server, MemberlistKV},
855+
DataObjConsumerPartitionRing: {MemberlistKV, Server, Ring},
856+
DataObjConsumer: {MemberlistKV, ScratchStore, DataObjConsumerPartitionRing, Server, UI},
857+
DataObjIndexBuilder: {ScratchStore, Server, UI},
858+
ScratchStore: {},
855859

856860
Read: {QueryFrontend, Querier},
857861
Write: {Ingester, Distributor, PatternIngester},

0 commit comments

Comments
 (0)