66
77 "github.com/go-kit/log"
88 "github.com/go-kit/log/level"
9+ "github.com/grafana/dskit/kv"
910 "github.com/grafana/dskit/ring"
1011 "github.com/grafana/dskit/services"
1112 "github.com/prometheus/client_golang/prometheus"
@@ -15,22 +16,26 @@ import (
1516 "github.com/grafana/loki/v3/pkg/dataobj/metastore"
1617 "github.com/grafana/loki/v3/pkg/kafka"
1718 "github.com/grafana/loki/v3/pkg/kafka/client"
19+ "github.com/grafana/loki/v3/pkg/kafka/partitionring"
1820 "github.com/grafana/loki/v3/pkg/scratch"
1921)
2022
2123const (
22- RingKey = "dataobj-consumer"
23- RingName = "dataobj-consumer"
24+ RingKey = "dataobj-consumer"
25+ RingName = "dataobj-consumer"
26+ PartitionRingKey = "dataobj-consumer-partitions-key"
27+ PartitionRingName = "dataobj-consumer-partitions"
2428)
2529
2630type Service struct {
2731 services.Service
28- cfg Config
29- metastoreEvents * kgo.Client
30- lifecycler * ring.Lifecycler
31- watcher * services.FailureWatcher
32- logger log.Logger
33- reg prometheus.Registerer
32+ cfg Config
33+ metastoreEvents * kgo.Client
34+ lifecycler * ring.Lifecycler
35+ partitionInstanceLifecycler * ring.PartitionInstanceLifecycler
36+ watcher * services.FailureWatcher
37+ logger log.Logger
38+ reg prometheus.Registerer
3439}
3540
3641func New (kafkaCfg kafka.Config , cfg Config , _ metastore.Config , _ objstore.Bucket , _ scratch.Store , _ string , _ ring.PartitionRingReader , reg prometheus.Registerer , logger log.Logger ) (* Service , error ) {
@@ -69,20 +74,57 @@ func New(kafkaCfg kafka.Config, cfg Config, _ metastore.Config, _ objstore.Bucke
6974 }
7075 s .lifecycler = lifecycler
7176
77+ // An instance must register itself in the partition ring. Each instance
78+ // is responsible for consuming exactly one partition determined by
79+ // its partition ID. Once ready, the instance will declare its partition
80+ // as active in the partition ring. This is how distributors know which
81+ // partitions have a ready consumer.
82+ partitionID , err := partitionring .ExtractPartitionID (cfg .LifecyclerConfig .ID )
83+ if err != nil {
84+ return nil , fmt .Errorf ("failed to extract partition ID from lifecycler configuration: %w" , err )
85+ }
86+ partitionRingKV := cfg .PartitionRingConfig .KVStore .Mock
87+ // The mock KV is used in tests. If this is not a test then we must
88+ // initialize a real kv.
89+ if partitionRingKV == nil {
90+ partitionRingKV , err = kv .NewClient (
91+ cfg .PartitionRingConfig .KVStore ,
92+ ring .GetPartitionRingCodec (),
93+ kv .RegistererWithKVName (reg , "dataobj-consumer-lifecycler" ),
94+ logger ,
95+ )
96+ if err != nil {
97+ return nil , fmt .Errorf ("failed to set up partition ring: %w" , err )
98+ }
99+ }
100+ partitionInstanceLifecycler := ring .NewPartitionInstanceLifecycler (
101+ cfg .PartitionRingConfig .ToLifecyclerConfig (partitionID , cfg .LifecyclerConfig .ID ),
102+ PartitionRingName ,
103+ PartitionRingKey ,
104+ partitionRingKV ,
105+ logger ,
106+ prometheus .WrapRegistererWithPrefix ("loki_" , reg ))
107+ s .partitionInstanceLifecycler = partitionInstanceLifecycler
108+
72109 watcher := services .NewFailureWatcher ()
73110 watcher .WatchService (lifecycler )
111+ watcher .WatchService (partitionInstanceLifecycler )
74112 s .watcher = watcher
75113
76114 s .Service = services .NewBasicService (s .starting , s .running , s .stopping )
77115 return s , nil
78116}
79117
80118// starting implements the Service interface's starting method.
81- func (s * Service ) starting (ctx context.Context ) ( err error ) {
119+ func (s * Service ) starting (ctx context.Context ) error {
82120 level .Info (s .logger ).Log ("msg" , "starting" )
83121 if err := services .StartAndAwaitRunning (ctx , s .lifecycler ); err != nil {
84122 return fmt .Errorf ("failed to start lifecycler: %w" , err )
85123 }
124+ if err := services .StartAndAwaitRunning (ctx , s .partitionInstanceLifecycler ); err != nil {
125+ return fmt .Errorf ("failed to start partition instance lifecycler: %w" , err )
126+ }
127+
86128 return nil
87129}
88130
@@ -96,6 +138,9 @@ func (s *Service) running(ctx context.Context) error {
96138func (s * Service ) stopping (failureCase error ) error {
97139 level .Info (s .logger ).Log ("msg" , "stopping" )
98140 ctx := context .TODO ()
141+ if err := services .StopAndAwaitTerminated (ctx , s .partitionInstanceLifecycler ); err != nil {
142+ level .Warn (s .logger ).Log ("msg" , "failed to stop partition instance lifecycler" , "err" , err )
143+ }
99144 if err := services .StopAndAwaitTerminated (ctx , s .lifecycler ); err != nil {
100145 level .Warn (s .logger ).Log ("msg" , "failed to stop lifecycler" , "err" , err )
101146 }
0 commit comments