Skip to content

Commit

Permalink
kafka replay speed: add support for metadata & source
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Sep 19, 2024
1 parent 13b5f99 commit 043c3b3
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 28 deletions.
67 changes: 50 additions & 17 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ type parallelStoragePusher struct {
metrics *pusherConsumerMetrics
logger log.Logger

pushers map[string]*parallelStorageShards
pushers map[string]map[mimirpb.WriteRequest_SourceEnum]*parallelStorageShards
upstreamPusher Pusher
numShards int
batchSize int
Expand All @@ -295,7 +295,7 @@ type parallelStoragePusher struct {
func newParallelStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher {
return &parallelStoragePusher{
logger: log.With(logger, "component", "parallel-storage-pusher"),
pushers: make(map[string]*parallelStorageShards),
pushers: make(map[string]map[mimirpb.WriteRequest_SourceEnum]*parallelStorageShards),
upstreamPusher: pusher,
numShards: numShards,
batchSize: batchSize,
Expand All @@ -310,30 +310,35 @@ func (c parallelStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.Wr
level.Error(c.logger).Log("msg", "failed to extract tenant ID from context", "err", err)
}

shards := c.shardsFor(userID)
shards := c.shardsFor(userID, wr.Source)
return shards.ShardWriteRequest(ctx, wr)
}

// Close implements the PusherCloser interface.
func (c parallelStoragePusher) Close() []error {
var errs multierror.MultiError
for _, p := range c.pushers {
errs.Add(p.Stop())
for _, sources := range c.pushers {
for _, p := range sources {
errs.Add(p.Stop())
}
}
clear(c.pushers)
return errs
}

// shardsFor returns the parallelStorageShards for the given userID. Once created the same shards are re-used for the same userID.
// We create a shard for each tenantID to parallelize the writes.
func (c parallelStoragePusher) shardsFor(userID string) *parallelStorageShards {
if p := c.pushers[userID]; p != nil {
func (c parallelStoragePusher) shardsFor(userID string, requestSource mimirpb.WriteRequest_SourceEnum) *parallelStorageShards {
if p := c.pushers[userID][requestSource]; p != nil {
return p
}
// Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes.
hashLabels := labels.Labels.Hash
p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, shardForSeriesBuffer, c.upstreamPusher, hashLabels)
c.pushers[userID] = p
if c.pushers[userID] == nil {
c.pushers[userID] = make(map[mimirpb.WriteRequest_SourceEnum]*parallelStorageShards)
}
c.pushers[userID][requestSource] = p
return p
}

Expand Down Expand Up @@ -378,6 +383,7 @@ func newParallelStorageShards(numTimeSeriesPerFlush prometheus.Histogram, numSha
}

// ShardWriteRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard.
// ShardWriteRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester.
func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *mimirpb.WriteRequest) error {
var (
builder labels.ScratchBuilder
Expand All @@ -391,7 +397,21 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *

// TODO: Add metrics to measure how long are items sitting in the queue before they are flushed.
// TODO dimitarvdimitrov support metadata and the rest of the fields; perhaps cut a new request for different values of SkipLabelNameValidation?
if err := p.shards[shard].AddToBatch(ctx, ts); err != nil {
if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
if !mimirpb.IsClientError(err) {
return err
}

errs.Add(err)
}
}

// Push metadata to every shard in a round-robin fashion.
shard := 0
for mdIdx := range request.Metadata {
if err := p.shards[shard].AddMetadataToBatch(ctx, request.Source, request.Metadata[mdIdx]); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
if !mimirpb.IsClientError(err) {
Expand All @@ -400,6 +420,8 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *

errs.Add(err)
}
shard++
shard %= p.numShards
}

// We might some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming.
Expand Down Expand Up @@ -471,24 +493,28 @@ func newBatchingQueue(capacity int, batchSize int) *batchingQueue {

// AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel().
// If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed.
func (q *batchingQueue) AddToBatch(ctx context.Context, ts mimirpb.PreallocTimeseries) error {
func (q *batchingQueue) AddToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, ts mimirpb.PreallocTimeseries) error {
q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts)
q.currentBatch.Context = ctx
q.currentBatch.Source = source

if len(q.currentBatch.Timeseries) >= q.batchSize {
if err := q.push(); err != nil {
return err
}
}
return q.pushIfFull()
}

return nil
// AddMetadataToBatch adds metadata to the current batch.
func (q *batchingQueue) AddMetadataToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, metadata *mimirpb.MetricMetadata) error {
q.currentBatch.Metadata = append(q.currentBatch.Metadata, metadata)
q.currentBatch.Context = ctx
q.currentBatch.Source = source

return q.pushIfFull()
}

// Close closes the batchingQueue, it'll push the current branch to the channel if it's not empty.
// and then close the channel.
func (q *batchingQueue) Close() error {
var errs multierror.MultiError
if len(q.currentBatch.Timeseries) > 0 {
if len(q.currentBatch.Timeseries)+len(q.currentBatch.Metadata) > 0 {
if err := q.push(); err != nil {
errs.Add(err)
}
Expand Down Expand Up @@ -518,6 +544,13 @@ func (q *batchingQueue) Done() {
close(q.done)
}

func (q *batchingQueue) pushIfFull() error {
if len(q.currentBatch.Metadata)+len(q.currentBatch.Timeseries) >= q.batchSize {
return q.push()
}
return nil
}

// push pushes the current batch to the channel and resets the current batch.
// It also collects any errors that might have occurred while pushing the batch.
func (q *batchingQueue) push() error {
Expand Down
Loading

0 comments on commit 043c3b3

Please sign in to comment.