Skip to content

Commit

Permalink
feat: record metrics from rules and export to remote (#3861)
Browse files Browse the repository at this point in the history
* feat: record metrics from rules and export to remote

* use CreatedBy instead of workerId

* complete matcher and state recorder

* observer pattern

* flush async, rename MetricsObserver to SampleObserver, agent name

* error handling

* Init (tenant injection)

* typo

* get rid of TimeSeries redundant type

* merging recorder into observer. renaming observer. static empty exporter

* remove redundant AggregatedFingerprint

* PR comments: SRP, testing, env vars, feature flag, ...

* control nil exporter in flush

* improve error for rules unmarshal

* settingsv1 import rename

* observer code comments

* minor changes

* fix *prompb.x, move recording rules funcs

* use encoding/json

* fix test

* fix test by using json logger

* metrics for exporter client (#3934)

* metrics for exporter client

* use promhttp instrumented round tripper

* remove custom round tripper, add TODO
  • Loading branch information
alsoba13 authored Feb 24, 2025
1 parent cf6e7a4 commit 7744262
Show file tree
Hide file tree
Showing 14 changed files with 1,059 additions and 22 deletions.
4 changes: 4 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ packages:
interfaces:
RaftNodeServiceClient:
RaftNodeServiceServer:
github.com/grafana/pyroscope/pkg/experiment/metrics:
interfaces:
Exporter:
Ruler:
40 changes: 34 additions & 6 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,24 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
}
}

func WithSampleObserver(observer SampleObserver) CompactionOption {
return func(p *compactionConfig) {
p.sampleObserver = observer
}
}

type compactionConfig struct {
objectOptions []ObjectOption
source objstore.BucketReader
destination objstore.Bucket
tempdir string
objectOptions []ObjectOption
source objstore.BucketReader
destination objstore.Bucket
tempdir string
sampleObserver SampleObserver
}

type SampleObserver interface {
// Observe is called before the compactor appends the entry
// to the output block. This method must not modify the entry.
Observe(ProfileEntry)
}

func Compact(
Expand Down Expand Up @@ -89,7 +102,7 @@ func Compact(

compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
for _, p := range plan {
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver)
if compactionErr != nil {
return nil, compactionErr
}
Expand Down Expand Up @@ -187,7 +200,12 @@ func newBlockCompaction(
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tempdir string) (m *metastorev1.BlockMeta, err error) {
func (b *CompactionPlan) Compact(
ctx context.Context,
dst objstore.Bucket,
tempdir string,
observer SampleObserver,
) (m *metastorev1.BlockMeta, err error) {
w, err := NewBlockWriter(tempdir)
if err != nil {
return nil, fmt.Errorf("creating block writer: %w", err)
Expand All @@ -199,6 +217,7 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tempd
// Datasets are compacted in a strict order.
for i, s := range b.datasets {
b.datasetIndex.setIndex(uint32(i))
s.registerSampleObserver(observer)
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
Expand Down Expand Up @@ -284,6 +303,8 @@ type datasetCompaction struct {
profiles uint64

flushOnce sync.Once

observer SampleObserver
}

func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
Expand Down Expand Up @@ -349,6 +370,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error)
return nil
}

func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
m.observer = observer
}

func (m *datasetCompaction) open(ctx context.Context, w io.Writer) (err error) {
var estimatedProfileTableSize int64
for _, ds := range m.datasets {
Expand Down Expand Up @@ -416,6 +441,9 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
return err
}
if m.observer != nil {
m.observer.Observe(r)
}
return m.profilesWriter.writeRow(r)
}

Expand Down
72 changes: 59 additions & 13 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compactor

import (
"context"
"encoding/binary"
"flag"
"fmt"
"os"
Expand All @@ -13,18 +14,22 @@ import (
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
"github.com/grafana/pyroscope/pkg/experiment/metrics"
"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/util"
)
Expand All @@ -36,7 +41,7 @@ type Worker struct {
config Config
client MetastoreClient
storage objstore.Bucket
metrics *metrics
metrics *compactionWorkerMetrics

jobs map[string]*compactionJob
queue chan *compactionJob
Expand All @@ -46,14 +51,18 @@ type Worker struct {
stopped atomic.Bool
closeOnce sync.Once
wg sync.WaitGroup

exporter metrics.Exporter
ruler metrics.Ruler
}

type Config struct {
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
RequestTimeout time.Duration `yaml:"request_timeout"`
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MetricsExporterEnabled bool `yaml:"metrics_exporter_enabled"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -63,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", 5*time.Second, "Job request timeout.")
f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.")
f.StringVar(&cfg.TempDir, prefix+"temp-dir", os.TempDir(), "Temporary directory for compaction jobs.")
f.BoolVar(&cfg.MetricsExporterEnabled, prefix+"metrics-exporter.enabled", false, "This parameter specifies whether the metrics exporter is enabled.")
}

type compactionJob struct {
Expand Down Expand Up @@ -100,18 +110,22 @@ func New(
client MetastoreClient,
storage objstore.Bucket,
reg prometheus.Registerer,
ruler metrics.Ruler,
exporter metrics.Exporter,
) (*Worker, error) {
config.TempDir = filepath.Join(filepath.Clean(config.TempDir), "pyroscope-compactor")
_ = os.RemoveAll(config.TempDir)
if err := os.MkdirAll(config.TempDir, 0o777); err != nil {
return nil, fmt.Errorf("failed to create compactor directory: %w", err)
}
w := &Worker{
config: config,
logger: logger,
client: client,
storage: storage,
metrics: newMetrics(reg),
config: config,
logger: logger,
client: client,
storage: storage,
metrics: newMetrics(reg),
ruler: ruler,
exporter: exporter,
}
w.threads = config.JobConcurrency
if w.threads < 1 {
Expand Down Expand Up @@ -176,6 +190,11 @@ func (w *Worker) running(ctx context.Context) error {
ticker.Stop()
close(stopPolling)
<-pollingDone
// Force exporter to send all staged samples (depends on the implementation)
// Must be a blocking call.
if w.exporter != nil {
w.exporter.Flush()
}
return nil
}

Expand Down Expand Up @@ -394,12 +413,20 @@ func (w *Worker) runCompaction(job *compactionJob) {

tempdir := filepath.Join(w.config.TempDir, job.Name)
sourcedir := filepath.Join(tempdir, "source")
compacted, err := block.Compact(ctx, job.blocks, w.storage,
options := []block.CompactionOption{
block.WithCompactionTempDir(tempdir),
block.WithCompactionObjectOptions(
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
block.WithObjectDownload(sourcedir),
))
),
}

if observer := w.buildSampleObserver(job.blocks[0]); observer != nil {
defer observer.Close()
options = append(options, block.WithSampleObserver(observer))
}

compacted, err := block.Compact(ctx, job.blocks, w.storage, options...)
defer func() {
if err = os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to remove compaction directory", "path", tempdir, "err", err)
Expand Down Expand Up @@ -458,6 +485,25 @@ func (w *Worker) runCompaction(job *compactionJob) {
_ = deleteGroup.Wait()
}

func (w *Worker) buildSampleObserver(md *metastorev1.BlockMeta) *metrics.SampleObserver {
if !w.config.MetricsExporterEnabled || md.CompactionLevel > 0 {
return nil
}
recordingTime := int64(ulid.MustParse(md.Id).Time())
pyroscopeInstanceLabel := labels.Label{
Name: "pyroscope_instance",
Value: pyroscopeInstanceHash(md.Shard, uint32(md.CreatedBy)),
}
return metrics.NewSampleObserver(recordingTime, w.exporter, w.ruler, pyroscopeInstanceLabel)
}

func pyroscopeInstanceHash(shard uint32, createdBy uint32) string {
buf := make([]byte, 8)
binary.BigEndian.PutUint32(buf[0:4], shard)
binary.BigEndian.PutUint32(buf[4:8], createdBy)
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout)
defer cancel()
Expand Down
6 changes: 3 additions & 3 deletions pkg/experiment/compactor/compaction_worker_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"github.com/grafana/pyroscope/pkg/util"
)

type metrics struct {
type compactionWorkerMetrics struct {
jobsInProgress *prometheus.GaugeVec
jobsCompleted *prometheus.CounterVec
jobDuration *prometheus.HistogramVec
timeToCompaction *prometheus.HistogramVec
}

func newMetrics(r prometheus.Registerer) *metrics {
m := &metrics{
func newMetrics(r prometheus.Registerer) *compactionWorkerMetrics {
m := &compactionWorkerMetrics{
jobsInProgress: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "jobs_in_progress",
Help: "The number of active compaction jobs currently running.",
Expand Down
Loading

0 comments on commit 7744262

Please sign in to comment.