From 0d9636034c9f0b1ddec5d141006146160d35f196 Mon Sep 17 00:00:00 2001 From: rahulguptajss Date: Thu, 20 Nov 2025 11:57:40 +0530 Subject: [PATCH 1/5] feat: disk cache --- cmd/exporters/prometheus/cache.go | 5 +- cmd/exporters/prometheus/cacher.go | 30 ++ cmd/exporters/prometheus/disk_cache.go | 334 ++++++++++++++++++++ cmd/exporters/prometheus/httpd.go | 113 ++++--- cmd/exporters/prometheus/prometheus.go | 97 +++++- cmd/exporters/prometheus/prometheus_test.go | 31 +- docs/prometheus-exporter.md | 40 +++ pkg/conf/conf.go | 17 +- 8 files changed, 601 insertions(+), 66 deletions(-) create mode 100644 cmd/exporters/prometheus/cacher.go create mode 100644 cmd/exporters/prometheus/disk_cache.go diff --git a/cmd/exporters/prometheus/cache.go b/cmd/exporters/prometheus/cache.go index ae73864c8..698440474 100644 --- a/cmd/exporters/prometheus/cache.go +++ b/cmd/exporters/prometheus/cache.go @@ -5,6 +5,7 @@ package prometheus import ( + "github.com/netapp/harvest/v2/pkg/set" "sync" "time" ) @@ -16,6 +17,8 @@ type cache struct { expire time.Duration } +var _ memoryCacher = (*cache)(nil) + func newCache(d time.Duration) *cache { c := cache{Mutex: &sync.Mutex{}, expire: d} c.data = make(map[string][][]byte) @@ -28,7 +31,7 @@ func (c *cache) Get() map[string][][]byte { return c.data } -func (c *cache) Put(key string, data [][]byte) { +func (c *cache) Put(key string, data [][]byte, _ *set.Set) { c.data[key] = data c.timers[key] = time.Now() } diff --git a/cmd/exporters/prometheus/cacher.go b/cmd/exporters/prometheus/cacher.go new file mode 100644 index 000000000..8fe146f5c --- /dev/null +++ b/cmd/exporters/prometheus/cacher.go @@ -0,0 +1,30 @@ +package prometheus + +import ( + "github.com/netapp/harvest/v2/pkg/set" + "io" +) + +type CacheStats struct { + NumCollectors int + NumObjects int + NumMetrics int + UniqueData map[string]map[string][]string +} + +type cacher interface { + Put(key string, data [][]byte, metricNames *set.Set) + Clean() +} + +type memoryCacher interface { + cacher + Get() map[string][][]byte +} + +type diskCacher interface { + cacher + StreamToWriter(w io.Writer) error + GetMetricCount() int + GetStats() (*CacheStats, error) +} diff --git a/cmd/exporters/prometheus/disk_cache.go b/cmd/exporters/prometheus/disk_cache.go new file mode 100644 index 000000000..5a09eebf9 --- /dev/null +++ b/cmd/exporters/prometheus/disk_cache.go @@ -0,0 +1,334 @@ +package prometheus + +import ( + "bufio" + "context" + "github.com/netapp/harvest/v2/pkg/set" + "github.com/netapp/harvest/v2/pkg/slogx" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +type diskCache struct { + *sync.Mutex + files map[string]string // key -> filepath + timers map[string]time.Time // key -> timestamp + metricNames map[string]*set.Set // key -> metric names + metricCounts map[string]int // key -> number of metric lines + expire time.Duration + baseDir string + logger *slog.Logger + ctx context.Context + cancel context.CancelFunc + writerPool *sync.Pool + readerPool *sync.Pool + keyReplacer *strings.Replacer +} + +var _ diskCacher = (*diskCache)(nil) + +func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCache { + if d <= 0 { + logger.Warn("invalid expire duration, using default 5 minutes", slog.Duration("provided", d)) + d = 5 * time.Minute + } + if baseDir == "" { + logger.Warn("empty base directory provided") + return nil + } + + _ = os.RemoveAll(baseDir) + if err := os.MkdirAll(baseDir, 0750); err != nil { + logger.Warn("failed to create cache directory", slogx.Err(err), slog.String("dir", baseDir)) + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + dc := &diskCache{ + Mutex: &sync.Mutex{}, + files: make(map[string]string), + timers: make(map[string]time.Time), + metricNames: make(map[string]*set.Set), + metricCounts: make(map[string]int), + expire: d, + baseDir: baseDir, + logger: logger, + ctx: ctx, + cancel: cancel, + writerPool: &sync.Pool{ + New: func() any { + return bufio.NewWriterSize(nil, 64*1024) + }, + }, + readerPool: &sync.Pool{ + New: func() any { + return bufio.NewReaderSize(nil, 64*1024) + }, + }, + keyReplacer: strings.NewReplacer("/", "_", "\\", "_", ":", "_"), + } + + go dc.cleanup() + return dc +} + +// GetStats returns cache statistics. +func (dc *diskCache) GetStats() (*CacheStats, error) { + stats := &CacheStats{ + UniqueData: make(map[string]map[string][]string), + } + + seenCollectors := make(map[string]struct{}) + seenObjects := make(map[string]struct{}) + + for key := range dc.files { + if dc.isExpired(key) { + continue + } + + parts := strings.Split(key, ".") + if len(parts) < 2 { + continue + } + + collector := parts[0] + object := parts[1] + + if strings.HasPrefix(object, "metadata_") { + continue + } + + metricNames, exists := dc.metricNames[key] + if !exists || metricNames == nil || metricNames.Size() == 0 { + continue + } + + stats.NumMetrics += metricNames.Size() + + if _, exists := stats.UniqueData[collector]; !exists { + stats.UniqueData[collector] = make(map[string][]string) + seenCollectors[collector] = struct{}{} + } + + objectKey := collector + "." + object + if _, exists := stats.UniqueData[collector][object]; !exists { + seenObjects[objectKey] = struct{}{} + } + + stats.UniqueData[collector][object] = metricNames.Values() + } + + stats.NumCollectors = len(seenCollectors) + stats.NumObjects = len(seenObjects) + + return stats, nil +} + +// GetMetricCount returns the total number of cached metrics. +func (dc *diskCache) GetMetricCount() int { + count := 0 + for key := range dc.files { + if dc.isExpired(key) { + continue + } + if metricCount, exists := dc.metricCounts[key]; exists { + count += metricCount + } + } + return count +} + +// Put stores metrics to disk and updates cache metadata. +func (dc *diskCache) Put(key string, data [][]byte, metricNames *set.Set) { + filePath := dc.generateFilepath(key) + + if err := dc.writeToDisk(filePath, data); err != nil { + dc.logger.Warn("failed to write cache file", + slogx.Err(err), + slog.String("key", key), + slog.String("file", filePath)) + return + } + + dc.files[key] = filePath + dc.timers[key] = time.Now() + if metricNames != nil && metricNames.Size() > 0 { + dc.metricNames[key] = metricNames + } else { + dc.metricNames[key] = nil + } + dc.metricCounts[key] = len(data) + + dc.logger.Debug("cached metrics to disk", + slog.String("key", key), + slog.String("file", filePath), + slog.Int("metrics_count", len(data))) +} + +// StreamToWriter streams all non-expired cache files to the writer. +func (dc *diskCache) StreamToWriter(w io.Writer) error { + var resultErr error + errorCount := 0 + totalCount := 0 + + for key, path := range dc.files { + if dc.isExpired(key) { + continue + } + totalCount++ + + if err := dc.streamFile(path, w); err != nil { + errorCount++ + if resultErr == nil { + resultErr = err + } + dc.logger.Debug("failed to stream cache file", + slogx.Err(err), slog.String("file", path)) + } + } + + if resultErr != nil { + dc.logger.Warn("failed to stream some cache files", + slog.Int("failed_count", errorCount), + slog.Int("total_count", totalCount)) + } + return resultErr +} + +func (dc *diskCache) openFile(filePath string) (*os.File, error) { + file, err := os.Open(filePath) + if os.IsNotExist(err) { + return nil, nil + } + return file, err +} + +func (dc *diskCache) closeFile(file *os.File) { + if err := file.Close(); err != nil { + dc.logger.Debug("failed to close file", slogx.Err(err)) + } +} + +func (dc *diskCache) streamFile(filePath string, w io.Writer) error { + file, err := dc.openFile(filePath) + if err != nil { + return err + } + if file == nil { + dc.logger.Debug("file is nil", slog.String("filePath", filePath)) + return nil + } + defer dc.closeFile(file) + + reader := dc.readerPool.Get().(*bufio.Reader) + reader.Reset(file) + defer dc.readerPool.Put(reader) + + _, err = io.Copy(w, reader) + return err +} + +func (dc *diskCache) Clean() { + dc.Lock() + defer dc.Unlock() + + for key, timestamp := range dc.timers { + if time.Since(timestamp) <= dc.expire { + continue + } + filePath := dc.files[key] + + delete(dc.files, key) + delete(dc.timers, key) + delete(dc.metricNames, key) + delete(dc.metricCounts, key) + + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + dc.logger.Debug("failed to remove expired cache file", + slogx.Err(err), + slog.String("file", filePath)) + } + + dc.logger.Debug("expired cache entry", slog.String("key", key)) + } + + entries, err := os.ReadDir(dc.baseDir) + if err != nil { + dc.logger.Debug("failed to read cache directory", slogx.Err(err), slog.String("baseDir", dc.baseDir)) + return + } + + knownFiles := make(map[string]struct{}, len(dc.files)) + for _, path := range dc.files { + knownFiles[path] = struct{}{} + } + + for _, entry := range entries { + fullPath := filepath.Join(dc.baseDir, entry.Name()) + + if _, found := knownFiles[fullPath]; !found { + _ = os.Remove(fullPath) + } + } +} + +func (dc *diskCache) generateFilepath(key string) string { + safeKey := dc.keyReplacer.Replace(key) + return filepath.Join(dc.baseDir, safeKey+".metrics") +} + +func (dc *diskCache) writeToDisk(filePath string, data [][]byte) error { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer dc.closeFile(file) + + writer := dc.writerPool.Get().(*bufio.Writer) + writer.Reset(file) + defer dc.writerPool.Put(writer) + + for _, line := range data { + if _, err := writer.Write(line); err != nil { + return err + } + if err := writer.WriteByte('\n'); err != nil { + return err + } + } + + return writer.Flush() +} + +// isExpired checks if a key is expired. +func (dc *diskCache) isExpired(key string) bool { + if timer, exists := dc.timers[key]; exists { + return time.Since(timer) >= dc.expire + } + return true +} + +func (dc *diskCache) cleanup() { + ticker := time.NewTicker(dc.expire / 2) // Clean twice per expiry period + defer ticker.Stop() + + for { + select { + case <-dc.ctx.Done(): + return + case <-ticker.C: + dc.Clean() + } + } +} + +func (dc *diskCache) Shutdown() { + if dc.cancel != nil { + dc.cancel() + } +} diff --git a/cmd/exporters/prometheus/httpd.go b/cmd/exporters/prometheus/httpd.go index 02f4a0ce8..0b240aa10 100644 --- a/cmd/exporters/prometheus/httpd.go +++ b/cmd/exporters/prometheus/httpd.go @@ -138,14 +138,29 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) { return } - p.cache.Lock() - tagsSeen := make(map[string]struct{}) - w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") - for _, metrics := range p.cache.Get() { - count += p.writeMetrics(w, metrics, tagsSeen) + tagsSeen := make(map[string]struct{}) + + if p.useDiskCache { + if diskcache, ok := p.cache.(*diskCache); ok { + diskcache.Lock() + count = diskcache.GetMetricCount() + err := diskcache.StreamToWriter(w) + diskcache.Unlock() + if err != nil { + p.Logger.Error("failed to stream metrics from disk cache", slogx.Err(err)) + } + } + } else { + if memCache, ok := p.cache.(*cache); ok { + memCache.Lock() + for _, metrics := range memCache.Get() { + count += p.writeMetrics(w, metrics, tagsSeen) + } + memCache.Unlock() + } } // serve our own metadata @@ -153,8 +168,6 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) { md, _ := p.render(p.Metadata) count += p.writeMetrics(w, md, tagsSeen) - p.cache.Unlock() - // update metadata p.Metadata.Reset() err := p.Metadata.LazySetValueInt64("time", "http", time.Since(start).Microseconds()) @@ -250,45 +263,69 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { uniqueData := map[string]map[string][]string{} - // copy cache so we don't lock it - p.cache.Lock() - cache := make(map[string][][]byte) - for key, data := range p.cache.Get() { - cache[key] = make([][]byte, len(data)) - copy(cache[key], data) - } - p.cache.Unlock() + switch c := p.cache.(type) { + case *cache: + c.Lock() + cacheData := make(map[string][][]byte) + for key, data := range c.Get() { + cacheData[key] = make([][]byte, len(data)) + copy(cacheData[key], data) + } + c.Unlock() - p.Logger.Debug("fetching cached elements", slog.Int("count", len(cache))) + p.Logger.Debug("fetching cached elements", slog.Int("count", len(cacheData))) - for key, data := range cache { - var collector, object string + for key, data := range cacheData { + var collector, object string - if keys := strings.Split(key, "."); len(keys) == 3 { - collector = keys[0] - object = keys[1] - } else { - continue - } + if keys := strings.Split(key, "."); len(keys) == 3 { + collector = keys[0] + object = keys[1] + } else { + continue + } - // skip metadata - if strings.HasPrefix(object, "metadata_") { - continue - } + // skip metadata + if strings.HasPrefix(object, "metadata_") { + continue + } + + metricNames := set.New() + for _, m := range data { + if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" { + metricNames.Add(x[0]) + } + } + numMetrics += metricNames.Size() - metricNames := set.New() - for _, m := range data { - if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" { - metricNames.Add(x[0]) + if _, exists := uniqueData[collector]; !exists { + uniqueData[collector] = make(map[string][]string) + numCollectors++ + } + if _, exists := uniqueData[collector][object]; !exists { + numObjects++ } + uniqueData[collector][object] = metricNames.Values() } - numMetrics += metricNames.Size() - - if _, exists := uniqueData[collector]; !exists { - uniqueData[collector] = make(map[string][]string) + case *diskCache: + c.Lock() + stats, err := c.GetStats() + c.Unlock() + if err != nil { + p.Logger.Error("failed to get cache statistics", slogx.Err(err)) + http.Error(w, "Failed to collect cache statistics", http.StatusInternalServerError) + return } - uniqueData[collector][object] = metricNames.Values() + numCollectors = stats.NumCollectors + numObjects = stats.NumObjects + numMetrics = stats.NumMetrics + uniqueData = stats.UniqueData + + default: + p.Logger.Error("unexpected cache type", slog.Any("type", fmt.Sprintf("%T", c))) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return } for col, perObject := range uniqueData { @@ -301,11 +338,9 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { } } objects = append(objects, fmt.Sprintf(objectTemplate, obj, strings.Join(metrics, "\n"))) - numObjects++ } body = append(body, fmt.Sprintf(collectorTemplate, col, strings.Join(objects, "\n"))) - numCollectors++ } poller := p.Options.Poller diff --git a/cmd/exporters/prometheus/prometheus.go b/cmd/exporters/prometheus/prometheus.go index 9cc69f57f..70f441bf7 100644 --- a/cmd/exporters/prometheus/prometheus.go +++ b/cmd/exporters/prometheus/prometheus.go @@ -30,6 +30,7 @@ import ( "github.com/netapp/harvest/v2/pkg/set" "github.com/netapp/harvest/v2/pkg/slogx" "log/slog" + "path/filepath" "regexp" "slices" "sort" @@ -48,7 +49,7 @@ const ( type Prometheus struct { *exporter.AbstractExporter - cache *cache + cache cacher allowAddrs []string allowAddrsRegex []*regexp.Regexp cacheAddrs map[string]bool @@ -56,12 +57,40 @@ type Prometheus struct { addMetaTags bool globalPrefix string replacer *strings.Replacer + useDiskCache bool } func New(abc *exporter.AbstractExporter) exporter.Exporter { return &Prometheus{AbstractExporter: abc} } +func (p *Prometheus) createCache(d time.Duration) cacher { + if p.useDiskCache { + // Path is mandatory when disk cache is enabled + if p.Params.DiskCache == nil || p.Params.DiskCache.Path == "" { + p.Logger.Error("disk cache enabled but path is not specified") + return nil + } + + cacheDir := p.Params.DiskCache.Path + + // Include poller name in cache directory to avoid collisions between multiple pollers + if p.Options.Poller != "" { + cacheDir = filepath.Join(cacheDir, p.Options.Poller) + } + + dc := newDiskCache(d, cacheDir, p.Logger) + + if dc != nil { + p.Logger.Debug("disk cache configured", + slog.String("cacheDir", cacheDir)) + } + + return dc + } + return newCache(d) +} + func (p *Prometheus) Init() error { if err := p.InitAbc(); err != nil { @@ -99,11 +128,21 @@ func (p *Prometheus) Init() error { p.addMetaTags = true } + // Check if disk cache is enabled (path is mandatory) + if p.Params.DiskCache != nil && p.Params.DiskCache.Path != "" { + p.useDiskCache = true + p.Logger.Debug("disk cache enabled - will use disk-based caching for RSS optimization", + slog.String("path", p.Params.DiskCache.Path)) + } else { + p.useDiskCache = false + p.Logger.Debug("disk cache disabled - using memory-based caching") + } + // all other parameters are only relevant to the HTTP daemon if x := p.Params.CacheMaxKeep; x != nil { if d, err := time.ParseDuration(*x); err == nil { p.Logger.Debug("using custom cache_max_keep", slog.String("cacheMaxKeep", *x)) - p.cache = newCache(d) + p.cache = p.createCache(d) } else { p.Logger.Error("cache_max_keep", slogx.Err(err), slog.String("x", *x)) } @@ -112,12 +151,16 @@ func (p *Prometheus) Init() error { if p.cache == nil { p.Logger.Debug("using default cache_max_keep", slog.String("cacheMaxKeep", cacheMaxKeep)) if d, err := time.ParseDuration(cacheMaxKeep); err == nil { - p.cache = newCache(d) + p.cache = p.createCache(d) } else { return err } } + if p.cache == nil { + return errs.New(errs.ErrInvalidParam, "cache initialization failed") + } + // allow access to metrics only from the given plain addresses if x := p.Params.AllowedAddrs; x != nil { p.allowAddrs = *x @@ -223,13 +266,35 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) { // fix render time for metadata d := time.Since(start) + // Extract metric names from matrix for cache statistics + var prefix string + if data.Object == "" { + prefix = strings.TrimSuffix(p.globalPrefix, "_") + } else { + prefix = p.globalPrefix + data.Object + } + + metricNames := set.New() + for _, metric := range data.GetMetrics() { + if metric.IsExportable() { + metricNames.Add(prefix + "_" + metric.GetName()) + } + } + // store metrics in cache key := data.UUID + "." + data.Object + "." + data.Identifier // lock cache, to prevent HTTPd reading while we are mutating it - p.cache.Lock() - p.cache.Put(key, metrics) - p.cache.Unlock() + switch c := p.cache.(type) { + case *cache: + c.Lock() + p.cache.Put(key, metrics, metricNames) + c.Unlock() + case *diskCache: + c.Lock() + p.cache.Put(key, metrics, metricNames) + c.Unlock() + } // update metadata p.AddExportCount(uint64(len(metrics))) @@ -506,7 +571,21 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { if p.Params.SortLabels { sort.Strings(metricLabels) } - x := prefix + "_" + metric.GetName() + "{" + joinedKeys + "," + strings.Join(metricLabels, ",") + "} " + value + + buf.Reset() + buf.WriteString(prefix) + buf.WriteString("_") + buf.WriteString(metric.GetName()) + buf.WriteString("{") + buf.WriteString(joinedKeys) + buf.WriteString(",") + buf.WriteString(strings.Join(metricLabels, ",")) + buf.WriteString("} ") + buf.WriteString(value) + + xbr := buf.Bytes() + metricLine := make([]byte, len(xbr)) + copy(metricLine, xbr) prefixedName := prefix + "_" + metric.GetName() if tagged != nil && !tagged.Has(prefixedName) { @@ -517,8 +596,8 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { renderedBytes += uint64(len(help)) + uint64(len(typeT)) } - rendered = append(rendered, []byte(x)) - renderedBytes += uint64(len(x)) + rendered = append(rendered, metricLine) + renderedBytes += uint64(len(metricLine)) // scalar metric } else { buf.Reset() diff --git a/cmd/exporters/prometheus/prometheus_test.go b/cmd/exporters/prometheus/prometheus_test.go index 16d268984..556ef4b11 100644 --- a/cmd/exporters/prometheus/prometheus_test.go +++ b/cmd/exporters/prometheus/prometheus_test.go @@ -147,9 +147,12 @@ net_app_bike_max_speed{} 3`, "bike"}, prom := p.(*Prometheus) var lines []string - for _, metrics := range prom.cache.Get() { - for _, metric := range metrics { - lines = append(lines, string(metric)) + + if memCache, ok := prom.cache.(*cache); ok { + for _, metrics := range memCache.Get() { + for _, metric := range metrics { + lines = append(lines, string(metric)) + } } } @@ -184,9 +187,12 @@ netapp_change_log{category="metric",cluster="umeng-aff300-01-02",object="volume" prom := p.(*Prometheus) var lines []string - for _, metrics := range prom.cache.Get() { - for _, metric := range metrics { - lines = append(lines, string(metric)) + + if memCache, ok := prom.cache.(*cache); ok { + for _, metrics := range memCache.Get() { + for _, metric := range metrics { + lines = append(lines, string(metric)) + } } } @@ -258,11 +264,14 @@ func TestRenderHistogramExample(t *testing.T) { prom := p.(*Prometheus) var lines []string - for _, metrics := range prom.cache.Get() { - for _, metricLine := range metrics { - sline := string(metricLine) - if !strings.HasPrefix(sline, "#") { - lines = append(lines, sline) + + if memCache, ok := prom.cache.(*cache); ok { + for _, metrics := range memCache.Get() { + for _, metricLine := range metrics { + sline := string(metricLine) + if !strings.HasPrefix(sline, "#") { + lines = append(lines, sline) + } } } } diff --git a/docs/prometheus-exporter.md b/docs/prometheus-exporter.md index 683419bc5..b60738be2 100644 --- a/docs/prometheus-exporter.md +++ b/docs/prometheus-exporter.md @@ -58,6 +58,7 @@ An overview of all parameters: | [`allow_addrs`](#allow_addrs) | list of strings, optional | allow access only if host matches any of the provided addresses | | | [`allow_addrs_regex`](#allow_addrs_regex) | list of strings, optional | allow access only if host address matches at least one of the regular expressions | | | `cache_max_keep` | string (Go duration format), optional | maximum amount of time metrics are cached (in case Prometheus does not timely collect the metrics) | `5m` | +| [`disk_cache`](#disk_cache) | object, optional | disk-based cache configuration | | | `global_prefix` | string, optional | add a prefix to all metrics (e.g. `netapp_`) | | | `local_http_addr` | string, optional | address of the HTTP server Harvest starts for Prometheus to scrape:
use `localhost` to serve only on the local machine
use `0.0.0.0` (default) if Prometheus is scrapping from another machine | `0.0.0.0` | | `port_range` | int-int (range), overrides `port` if specified | lower port to upper port (inclusive) of the HTTP end-point to create when a poller specifies this exporter. Starting at lower port, each free port will be tried sequentially up to the upper port. | | @@ -221,6 +222,45 @@ Exporters: Access will only be allowed from the IP4 range `192.168.0.0`-`192.168.0.255`. +### disk_cache +The `disk_cache` parameter enables disk-based staging of metrics before they are served to Prometheus. Instead of storing formatted metrics in memory, Harvest flushes them to disk files. When Prometheus scrapes the `/metrics` endpoint, Harvest reads these cached files from disk and streams them directly to Prometheus. This approach reduces memory overhead, making it ideal for large deployments with many metrics. + +**Configuration:** + +The `disk_cache` parameter requires a `path` field that specifies the directory where cache files will be stored. The path is **mandatory** when using disk cache. + +**Notes:** + +- The `path` is **required** when using `disk_cache` +- Harvest will automatically create a subdirectory for each poller to avoid conflicts between multiple pollers +- The cache directory is cleared on startup +- Ensure the specified directory is writable by the Harvest process + +**Example:** + +```yaml +Exporters: + prom_disk: + exporter: Prometheus + port_range: 13000-13100 + disk_cache: + path: /var/lib/harvest/cache + +Pollers: + cluster-01: + addr: 10.0.1.1 + exporters: + - prom_disk + cluster-02: + addr: 10.0.1.2 + exporters: + - prom_disk +``` + +In this example, cache files will be created in: +- `/var/lib/harvest/cache/cluster-01/` +- `/var/lib/harvest/cache/cluster-02/` + ## Configure Prometheus to scrape Harvest pollers There are two ways to tell Prometheus how to scrape Harvest: using HTTP service discovery (SD) or listing each poller diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index 2a4949d96..c65fa96a0 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -785,6 +785,10 @@ func ZapiPoller(n *node.Node) *Poller { return &p } +type DiskCacheConfig struct { + Path string `yaml:"path"` +} + type Exporter struct { Port *int `yaml:"port,omitempty"` PortRange *IntRange `yaml:"port_range,omitempty"` @@ -804,12 +808,13 @@ type Exporter struct { TLS TLS `yaml:"tls,omitempty"` // InfluxDB specific - Bucket *string `yaml:"bucket,omitempty"` - Org *string `yaml:"org,omitempty"` - Token *string `yaml:"token,omitempty"` - Precision *string `yaml:"precision,omitempty"` - ClientTimeout *string `yaml:"client_timeout,omitempty"` - Version *string `yaml:"version,omitempty"` + Bucket *string `yaml:"bucket,omitempty"` + Org *string `yaml:"org,omitempty"` + Token *string `yaml:"token,omitempty"` + Precision *string `yaml:"precision,omitempty"` + ClientTimeout *string `yaml:"client_timeout,omitempty"` + Version *string `yaml:"version,omitempty"` + DiskCache *DiskCacheConfig `yaml:"disk_cache,omitempty"` IsTest bool `yaml:"-"` // true when run from unit tests IsEmbedded bool `yaml:"-"` // true when the exporter is embedded in a poller From fc17a2536431ed6e1146c07e67234183c7c6d180 Mon Sep 17 00:00:00 2001 From: rahulguptajss Date: Thu, 20 Nov 2025 13:34:12 +0530 Subject: [PATCH 2/5] feat: disk cache --- cmd/exporters/prometheus/httpd.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/exporters/prometheus/httpd.go b/cmd/exporters/prometheus/httpd.go index 0b240aa10..009508fa2 100644 --- a/cmd/exporters/prometheus/httpd.go +++ b/cmd/exporters/prometheus/httpd.go @@ -10,8 +10,6 @@ import ( "bytes" "errors" "fmt" - "github.com/netapp/harvest/v2/pkg/set" - "github.com/netapp/harvest/v2/pkg/slogx" "io" "log/slog" "net" @@ -22,6 +20,9 @@ import ( "strconv" "strings" "time" + + "github.com/netapp/harvest/v2/pkg/set" + "github.com/netapp/harvest/v2/pkg/slogx" ) func (p *Prometheus) startHTTPD(addr string, port int) { @@ -323,7 +324,7 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { uniqueData = stats.UniqueData default: - p.Logger.Error("unexpected cache type", slog.Any("type", fmt.Sprintf("%T", c))) + p.Logger.Error("unexpected cache type") http.Error(w, "Internal server error", http.StatusInternalServerError) return } From 1c2fd4194a53da50efa161f6be41810d775020e1 Mon Sep 17 00:00:00 2001 From: rahulguptajss Date: Wed, 26 Nov 2025 11:34:20 +0530 Subject: [PATCH 3/5] feat: disk cache --- cmd/exporters/prometheus/cache.go | 2 - cmd/exporters/prometheus/cacher.go | 30 ---------- cmd/exporters/prometheus/disk_cache.go | 10 +++- cmd/exporters/prometheus/httpd.go | 66 +++++++++------------ cmd/exporters/prometheus/prometheus.go | 41 +++++++------ cmd/exporters/prometheus/prometheus_test.go | 35 +++++------ 6 files changed, 71 insertions(+), 113 deletions(-) delete mode 100644 cmd/exporters/prometheus/cacher.go diff --git a/cmd/exporters/prometheus/cache.go b/cmd/exporters/prometheus/cache.go index 698440474..bccceb91a 100644 --- a/cmd/exporters/prometheus/cache.go +++ b/cmd/exporters/prometheus/cache.go @@ -17,8 +17,6 @@ type cache struct { expire time.Duration } -var _ memoryCacher = (*cache)(nil) - func newCache(d time.Duration) *cache { c := cache{Mutex: &sync.Mutex{}, expire: d} c.data = make(map[string][][]byte) diff --git a/cmd/exporters/prometheus/cacher.go b/cmd/exporters/prometheus/cacher.go deleted file mode 100644 index 8fe146f5c..000000000 --- a/cmd/exporters/prometheus/cacher.go +++ /dev/null @@ -1,30 +0,0 @@ -package prometheus - -import ( - "github.com/netapp/harvest/v2/pkg/set" - "io" -) - -type CacheStats struct { - NumCollectors int - NumObjects int - NumMetrics int - UniqueData map[string]map[string][]string -} - -type cacher interface { - Put(key string, data [][]byte, metricNames *set.Set) - Clean() -} - -type memoryCacher interface { - cacher - Get() map[string][][]byte -} - -type diskCacher interface { - cacher - StreamToWriter(w io.Writer) error - GetMetricCount() int - GetStats() (*CacheStats, error) -} diff --git a/cmd/exporters/prometheus/disk_cache.go b/cmd/exporters/prometheus/disk_cache.go index 5a09eebf9..b08b9a849 100644 --- a/cmd/exporters/prometheus/disk_cache.go +++ b/cmd/exporters/prometheus/disk_cache.go @@ -14,6 +14,14 @@ import ( "time" ) +// CacheStats holds statistics about cached metrics +type CacheStats struct { + NumCollectors int + NumObjects int + NumMetrics int + UniqueData map[string]map[string][]string +} + type diskCache struct { *sync.Mutex files map[string]string // key -> filepath @@ -30,8 +38,6 @@ type diskCache struct { keyReplacer *strings.Replacer } -var _ diskCacher = (*diskCache)(nil) - func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCache { if d <= 0 { logger.Warn("invalid expire duration, using default 5 minutes", slog.Duration("provided", d)) diff --git a/cmd/exporters/prometheus/httpd.go b/cmd/exporters/prometheus/httpd.go index 009508fa2..4b6e62e6e 100644 --- a/cmd/exporters/prometheus/httpd.go +++ b/cmd/exporters/prometheus/httpd.go @@ -145,23 +145,19 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) { tagsSeen := make(map[string]struct{}) if p.useDiskCache { - if diskcache, ok := p.cache.(*diskCache); ok { - diskcache.Lock() - count = diskcache.GetMetricCount() - err := diskcache.StreamToWriter(w) - diskcache.Unlock() - if err != nil { - p.Logger.Error("failed to stream metrics from disk cache", slogx.Err(err)) - } + p.diskCache.Lock() + count = p.diskCache.GetMetricCount() + err := p.diskCache.StreamToWriter(w) + p.diskCache.Unlock() + if err != nil { + p.Logger.Error("failed to stream metrics from disk cache", slogx.Err(err)) } } else { - if memCache, ok := p.cache.(*cache); ok { - memCache.Lock() - for _, metrics := range memCache.Get() { - count += p.writeMetrics(w, metrics, tagsSeen) - } - memCache.Unlock() + p.memoryCache.Lock() + for _, metrics := range p.memoryCache.Get() { + count += p.writeMetrics(w, metrics, tagsSeen) } + p.memoryCache.Unlock() } // serve our own metadata @@ -264,15 +260,28 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { uniqueData := map[string]map[string][]string{} - switch c := p.cache.(type) { - case *cache: - c.Lock() + if p.useDiskCache { + p.diskCache.Lock() + stats, err := p.diskCache.GetStats() + p.diskCache.Unlock() + if err != nil { + p.Logger.Error("failed to get cache statistics", slogx.Err(err)) + http.Error(w, "Failed to collect cache statistics", http.StatusInternalServerError) + return + } + + numCollectors = stats.NumCollectors + numObjects = stats.NumObjects + numMetrics = stats.NumMetrics + uniqueData = stats.UniqueData + } else { + p.memoryCache.Lock() cacheData := make(map[string][][]byte) - for key, data := range c.Get() { + for key, data := range p.memoryCache.Get() { cacheData[key] = make([][]byte, len(data)) copy(cacheData[key], data) } - c.Unlock() + p.memoryCache.Unlock() p.Logger.Debug("fetching cached elements", slog.Int("count", len(cacheData))) @@ -308,25 +317,6 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { } uniqueData[collector][object] = metricNames.Values() } - case *diskCache: - c.Lock() - stats, err := c.GetStats() - c.Unlock() - if err != nil { - p.Logger.Error("failed to get cache statistics", slogx.Err(err)) - http.Error(w, "Failed to collect cache statistics", http.StatusInternalServerError) - return - } - - numCollectors = stats.NumCollectors - numObjects = stats.NumObjects - numMetrics = stats.NumMetrics - uniqueData = stats.UniqueData - - default: - p.Logger.Error("unexpected cache type") - http.Error(w, "Internal server error", http.StatusInternalServerError) - return } for col, perObject := range uniqueData { diff --git a/cmd/exporters/prometheus/prometheus.go b/cmd/exporters/prometheus/prometheus.go index 70f441bf7..73fa275fa 100644 --- a/cmd/exporters/prometheus/prometheus.go +++ b/cmd/exporters/prometheus/prometheus.go @@ -49,7 +49,8 @@ const ( type Prometheus struct { *exporter.AbstractExporter - cache cacher + memoryCache *cache + diskCache *diskCache allowAddrs []string allowAddrsRegex []*regexp.Regexp cacheAddrs map[string]bool @@ -64,12 +65,12 @@ func New(abc *exporter.AbstractExporter) exporter.Exporter { return &Prometheus{AbstractExporter: abc} } -func (p *Prometheus) createCache(d time.Duration) cacher { +func (p *Prometheus) createCache(d time.Duration) { if p.useDiskCache { // Path is mandatory when disk cache is enabled if p.Params.DiskCache == nil || p.Params.DiskCache.Path == "" { p.Logger.Error("disk cache enabled but path is not specified") - return nil + return } cacheDir := p.Params.DiskCache.Path @@ -79,16 +80,15 @@ func (p *Prometheus) createCache(d time.Duration) cacher { cacheDir = filepath.Join(cacheDir, p.Options.Poller) } - dc := newDiskCache(d, cacheDir, p.Logger) + p.diskCache = newDiskCache(d, cacheDir, p.Logger) - if dc != nil { + if p.diskCache != nil { p.Logger.Debug("disk cache configured", slog.String("cacheDir", cacheDir)) } - - return dc + } else { + p.memoryCache = newCache(d) } - return newCache(d) } func (p *Prometheus) Init() error { @@ -142,22 +142,22 @@ func (p *Prometheus) Init() error { if x := p.Params.CacheMaxKeep; x != nil { if d, err := time.ParseDuration(*x); err == nil { p.Logger.Debug("using custom cache_max_keep", slog.String("cacheMaxKeep", *x)) - p.cache = p.createCache(d) + p.createCache(d) } else { p.Logger.Error("cache_max_keep", slogx.Err(err), slog.String("x", *x)) } } - if p.cache == nil { + if p.memoryCache == nil && p.diskCache == nil { p.Logger.Debug("using default cache_max_keep", slog.String("cacheMaxKeep", cacheMaxKeep)) if d, err := time.ParseDuration(cacheMaxKeep); err == nil { - p.cache = p.createCache(d) + p.createCache(d) } else { return err } } - if p.cache == nil { + if p.memoryCache == nil && p.diskCache == nil { return errs.New(errs.ErrInvalidParam, "cache initialization failed") } @@ -285,15 +285,14 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) { key := data.UUID + "." + data.Object + "." + data.Identifier // lock cache, to prevent HTTPd reading while we are mutating it - switch c := p.cache.(type) { - case *cache: - c.Lock() - p.cache.Put(key, metrics, metricNames) - c.Unlock() - case *diskCache: - c.Lock() - p.cache.Put(key, metrics, metricNames) - c.Unlock() + if p.useDiskCache { + p.diskCache.Lock() + p.diskCache.Put(key, metrics, metricNames) + p.diskCache.Unlock() + } else { + p.memoryCache.Lock() + p.memoryCache.Put(key, metrics, metricNames) + p.memoryCache.Unlock() } // update metadata diff --git a/cmd/exporters/prometheus/prometheus_test.go b/cmd/exporters/prometheus/prometheus_test.go index 556ef4b11..a38dcb244 100644 --- a/cmd/exporters/prometheus/prometheus_test.go +++ b/cmd/exporters/prometheus/prometheus_test.go @@ -5,15 +5,16 @@ package prometheus import ( + "slices" + "strings" + "testing" + "github.com/google/go-cmp/cmp" "github.com/netapp/harvest/v2/assert" "github.com/netapp/harvest/v2/cmd/poller/exporter" "github.com/netapp/harvest/v2/cmd/poller/options" "github.com/netapp/harvest/v2/pkg/conf" "github.com/netapp/harvest/v2/pkg/matrix" - "slices" - "strings" - "testing" ) func TestFilterMetaTags(t *testing.T) { @@ -148,11 +149,9 @@ net_app_bike_max_speed{} 3`, "bike"}, prom := p.(*Prometheus) var lines []string - if memCache, ok := prom.cache.(*cache); ok { - for _, metrics := range memCache.Get() { - for _, metric := range metrics { - lines = append(lines, string(metric)) - } + for _, metrics := range prom.memoryCache.Get() { + for _, metric := range metrics { + lines = append(lines, string(metric)) } } @@ -188,11 +187,9 @@ netapp_change_log{category="metric",cluster="umeng-aff300-01-02",object="volume" prom := p.(*Prometheus) var lines []string - if memCache, ok := prom.cache.(*cache); ok { - for _, metrics := range memCache.Get() { - for _, metric := range metrics { - lines = append(lines, string(metric)) - } + for _, metrics := range prom.memoryCache.Get() { + for _, metric := range metrics { + lines = append(lines, string(metric)) } } @@ -265,13 +262,11 @@ func TestRenderHistogramExample(t *testing.T) { prom := p.(*Prometheus) var lines []string - if memCache, ok := prom.cache.(*cache); ok { - for _, metrics := range memCache.Get() { - for _, metricLine := range metrics { - sline := string(metricLine) - if !strings.HasPrefix(sline, "#") { - lines = append(lines, sline) - } + for _, metrics := range prom.memoryCache.Get() { + for _, metricLine := range metrics { + sline := string(metricLine) + if !strings.HasPrefix(sline, "#") { + lines = append(lines, sline) } } } From ac3c99efaa86a1f353b3b187a0c3727beb28dac0 Mon Sep 17 00:00:00 2001 From: rahulguptajss Date: Wed, 3 Dec 2025 14:29:56 +0530 Subject: [PATCH 4/5] feat: disk cache --- cmd/exporters/prometheus/prometheus.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/exporters/prometheus/prometheus.go b/cmd/exporters/prometheus/prometheus.go index 73fa275fa..d9b9bbfdd 100644 --- a/cmd/exporters/prometheus/prometheus.go +++ b/cmd/exporters/prometheus/prometheus.go @@ -729,6 +729,10 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { } } + // Both memory and disk cache add a newline character after each metric line + // when serving via HTTP (see writeMetric() and writeToDisk()) + renderedBytes += uint64(len(rendered)) // Add 1 byte per line for '\n' + stats := exporter.Stats{ InstancesExported: instancesExported, MetricsExported: uint64(len(rendered)), From 1ea6d388a86d437084af62e26b764fe6ccf86c5f Mon Sep 17 00:00:00 2001 From: Chris Grindstaff Date: Wed, 3 Dec 2025 08:11:30 -0500 Subject: [PATCH 5/5] feat: disk cache (#4051) --- cmd/exporters/prometheus/cache.go | 166 ++++++++++++++++++- cmd/exporters/prometheus/cache_test.go | 38 +++++ cmd/exporters/prometheus/disk_cache.go | 50 +++++- cmd/exporters/prometheus/httpd.go | 173 +++----------------- cmd/exporters/prometheus/prometheus.go | 110 ++++--------- cmd/exporters/prometheus/prometheus_test.go | 37 +---- 6 files changed, 304 insertions(+), 270 deletions(-) create mode 100644 cmd/exporters/prometheus/cache_test.go diff --git a/cmd/exporters/prometheus/cache.go b/cmd/exporters/prometheus/cache.go index bccceb91a..9f5da5379 100644 --- a/cmd/exporters/prometheus/cache.go +++ b/cmd/exporters/prometheus/cache.go @@ -5,36 +5,188 @@ package prometheus import ( + "bytes" "github.com/netapp/harvest/v2/pkg/set" + "github.com/netapp/harvest/v2/pkg/slogx" + "io" + "log/slog" + "net/http" + "strings" "sync" "time" ) -type cache struct { - *sync.Mutex +type cacher interface { + getOverview() (*CacheStats, error) + exportMetrics(key string, data [][]byte, names *set.Set) + streamMetrics(w http.ResponseWriter, seen map[string]struct{}, metrics [][]byte) (int, error) + isValid() bool +} + +type memCache struct { + mu *sync.Mutex + logger *slog.Logger data map[string][][]byte timers map[string]time.Time expire time.Duration } -func newCache(d time.Duration) *cache { - c := cache{Mutex: &sync.Mutex{}, expire: d} +func (c *memCache) isValid() bool { + return true +} + +func (c *memCache) getOverview() (*CacheStats, error) { + c.mu.Lock() + cacheData := make(map[string][][]byte) + for key, data := range c.Get() { + cacheData[key] = make([][]byte, len(data)) + copy(cacheData[key], data) + } + c.mu.Unlock() + + stats := &CacheStats{ + UniqueData: make(map[string]map[string][]string), + } + + for key, data := range cacheData { + var collector, object string + + if keys := strings.Split(key, "."); len(keys) == 3 { + collector = keys[0] + object = keys[1] + } else { + continue + } + + // skip metadata + if strings.HasPrefix(object, "metadata_") { + continue + } + + metricNames := set.New() + for _, m := range data { + if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" { + metricNames.Add(x[0]) + } + } + stats.NumMetrics += metricNames.Size() + + if _, exists := stats.UniqueData[collector]; !exists { + stats.UniqueData[collector] = make(map[string][]string) + stats.NumCollectors++ + } + if _, exists := stats.UniqueData[collector][object]; !exists { + stats.NumObjects++ + } + stats.UniqueData[collector][object] = metricNames.Values() + } + + return stats, nil +} + +func (c *memCache) exportMetrics(key string, data [][]byte, metricNames *set.Set) { + c.Put(key, data, metricNames) +} + +func (c *memCache) streamMetrics(w http.ResponseWriter, tagsSeen map[string]struct{}, metrics [][]byte) (int, error) { + c.mu.Lock() + var count int + if metrics == nil { + // stream all cached metrics + for _, metrics := range c.Get() { + count += c.writeMetrics(w, metrics, tagsSeen) + } + } else { + // stream only provided metrics + count += c.writeMetrics(w, metrics, tagsSeen) + } + + c.mu.Unlock() + + return count, nil +} + +// writeMetrics writes metrics to the writer, skipping duplicates. +// Normally Render() only adds one TYPE/HELP for each metric type. +// Some metric types (e.g., metadata_collector_metrics) are submitted from multiple collectors. +// That causes duplicates that are suppressed in this function. +// The seen map is used to keep track of which metrics have been added. +func (c *memCache) writeMetrics(w io.Writer, metrics [][]byte, tagsSeen map[string]struct{}) int { + + var count int + + for i := 0; i < len(metrics); i++ { + metric := metrics[i] + if bytes.HasPrefix(metric, []byte("# ")) { + + // Find the metric name and check if it has been seen before + var ( + spacesSeen int + space2Index int + ) + + for j := range metric { + if metric[j] == ' ' { + spacesSeen++ + if spacesSeen == 2 { + space2Index = j + } else if spacesSeen == 3 { + name := string(metric[space2Index+1 : j]) + if _, ok := tagsSeen[name]; !ok { + tagsSeen[name] = struct{}{} + c.writeMetric(w, metric) + count++ + if i+1 < len(metrics) { + c.writeMetric(w, metrics[i+1]) + count++ + i++ + } + } + break + } + } + } + } else { + c.writeMetric(w, metric) + count++ + } + } + + return count +} + +func (c *memCache) writeMetric(w io.Writer, data []byte) { + _, err := w.Write(data) + if err != nil { + c.logger.Error("write metrics", slogx.Err(err)) + } + _, err = w.Write([]byte("\n")) + if err != nil { + c.logger.Error("write newline", slogx.Err(err)) + } +} + +func newMemCache(l *slog.Logger, d time.Duration) *memCache { + c := memCache{mu: &sync.Mutex{}, expire: d, logger: l} c.data = make(map[string][][]byte) c.timers = make(map[string]time.Time) return &c } -func (c *cache) Get() map[string][][]byte { +func (c *memCache) Get() map[string][][]byte { c.Clean() return c.data } -func (c *cache) Put(key string, data [][]byte, _ *set.Set) { +func (c *memCache) Put(key string, data [][]byte, _ *set.Set) { + c.mu.Lock() + defer c.mu.Unlock() + c.data[key] = data c.timers[key] = time.Now() } -func (c *cache) Clean() { +func (c *memCache) Clean() { for k, t := range c.timers { if time.Since(t) > c.expire { delete(c.timers, k) diff --git a/cmd/exporters/prometheus/cache_test.go b/cmd/exporters/prometheus/cache_test.go new file mode 100644 index 000000000..02033bf6f --- /dev/null +++ b/cmd/exporters/prometheus/cache_test.go @@ -0,0 +1,38 @@ +package prometheus + +import ( + "github.com/google/go-cmp/cmp" + "github.com/netapp/harvest/v2/assert" + "strings" + "testing" +) + +func Test_memcache_streamMetrics(t *testing.T) { + example := [][]byte{ + []byte(`# HELP some_metric help text`), + []byte(`# TYPE some_metric type`), + []byte(`some_metric{node="node_1"} 0.0`), + []byte(`# HELP some_other_metric help text`), + []byte(`# TYPE some_other_metric type`), + []byte(`some_other_metric{node="node_2"} 0.0`), + []byte(`# HELP some_other_metric DUPLICATE help text`), + []byte(`# TYPE some_other_metric type`), + []byte(`some_other_metric{node="node_3"} 0.0`), + } + + expected := `# HELP some_metric help text +# TYPE some_metric type +some_metric{node="node_1"} 0.0 +# HELP some_other_metric help text +# TYPE some_other_metric type +some_other_metric{node="node_2"} 0.0 +some_other_metric{node="node_3"} 0.0 +` + m := memCache{} + seen := make(map[string]struct{}) + var w strings.Builder + _ = m.writeMetrics(&w, example, seen) + + diff := cmp.Diff(w.String(), expected) + assert.Equal(t, diff, "") +} diff --git a/cmd/exporters/prometheus/disk_cache.go b/cmd/exporters/prometheus/disk_cache.go index b08b9a849..394e9751c 100644 --- a/cmd/exporters/prometheus/disk_cache.go +++ b/cmd/exporters/prometheus/disk_cache.go @@ -7,6 +7,7 @@ import ( "github.com/netapp/harvest/v2/pkg/slogx" "io" "log/slog" + "net/http" "os" "path/filepath" "strings" @@ -23,7 +24,7 @@ type CacheStats struct { } type diskCache struct { - *sync.Mutex + mu *sync.Mutex files map[string]string // key -> filepath timers map[string]time.Time // key -> timestamp metricNames map[string]*set.Set // key -> metric names @@ -38,6 +39,40 @@ type diskCache struct { keyReplacer *strings.Replacer } +func (dc *diskCache) isValid() bool { + return dc != nil && dc.baseDir != "" +} + +func (dc *diskCache) getOverview() (*CacheStats, error) { + dc.mu.Lock() + stats, err := dc.GetStats() + dc.mu.Unlock() + if err != nil { + return nil, err + } + + return stats, nil +} + +func (dc *diskCache) exportMetrics(key string, data [][]byte, metricNames *set.Set) { + dc.Put(key, data, metricNames) +} + +func (dc *diskCache) streamMetrics(w http.ResponseWriter, _ map[string]struct{}, metrics [][]byte) (int, error) { + // since the disk cache streams all cached metrics including metadata, we ignore streaming when metrics is not nil + if metrics != nil { + return 0, nil + } + dc.mu.Lock() + defer dc.mu.Unlock() + + err := dc.streamToWriter(w) + if err != nil { + return 0, err + } + return dc.GetMetricCount(), nil +} + func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCache { if d <= 0 { logger.Warn("invalid expire duration, using default 5 minutes", slog.Duration("provided", d)) @@ -56,7 +91,7 @@ func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCac ctx, cancel := context.WithCancel(context.Background()) dc := &diskCache{ - Mutex: &sync.Mutex{}, + mu: &sync.Mutex{}, files: make(map[string]string), timers: make(map[string]time.Time), metricNames: make(map[string]*set.Set), @@ -151,6 +186,9 @@ func (dc *diskCache) GetMetricCount() int { // Put stores metrics to disk and updates cache metadata. func (dc *diskCache) Put(key string, data [][]byte, metricNames *set.Set) { + dc.mu.Lock() + defer dc.mu.Unlock() + filePath := dc.generateFilepath(key) if err := dc.writeToDisk(filePath, data); err != nil { @@ -176,8 +214,8 @@ func (dc *diskCache) Put(key string, data [][]byte, metricNames *set.Set) { slog.Int("metrics_count", len(data))) } -// StreamToWriter streams all non-expired cache files to the writer. -func (dc *diskCache) StreamToWriter(w io.Writer) error { +// streamToWriter streams all non-expired cache files to the writer. +func (dc *diskCache) streamToWriter(w io.Writer) error { var resultErr error errorCount := 0 totalCount := 0 @@ -240,8 +278,8 @@ func (dc *diskCache) streamFile(filePath string, w io.Writer) error { } func (dc *diskCache) Clean() { - dc.Lock() - defer dc.Unlock() + dc.mu.Lock() + defer dc.mu.Unlock() for key, timestamp := range dc.timers { if time.Since(timestamp) <= dc.expire { diff --git a/cmd/exporters/prometheus/httpd.go b/cmd/exporters/prometheus/httpd.go index 4b6e62e6e..f8f82e451 100644 --- a/cmd/exporters/prometheus/httpd.go +++ b/cmd/exporters/prometheus/httpd.go @@ -7,10 +7,8 @@ package prometheus import ( - "bytes" "errors" "fmt" - "io" "log/slog" "net" "net/http" @@ -21,7 +19,6 @@ import ( "strings" "time" - "github.com/netapp/harvest/v2/pkg/set" "github.com/netapp/harvest/v2/pkg/slogx" ) @@ -144,30 +141,22 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) { tagsSeen := make(map[string]struct{}) - if p.useDiskCache { - p.diskCache.Lock() - count = p.diskCache.GetMetricCount() - err := p.diskCache.StreamToWriter(w) - p.diskCache.Unlock() - if err != nil { - p.Logger.Error("failed to stream metrics from disk cache", slogx.Err(err)) - } - } else { - p.memoryCache.Lock() - for _, metrics := range p.memoryCache.Get() { - count += p.writeMetrics(w, metrics, tagsSeen) - } - p.memoryCache.Unlock() + _, err := p.aCache.streamMetrics(w, tagsSeen, nil) + if err != nil { + p.Logger.Error("failed to stream metrics", slogx.Err(err)) } // serve our own metadata // notice that some values are always taken from previous session - md, _ := p.render(p.Metadata) - count += p.writeMetrics(w, md, tagsSeen) + md, _, _ := p.render(p.Metadata) + _, err = p.aCache.streamMetrics(w, tagsSeen, md) + if err != nil { + p.Logger.Error("failed to stream metadata metrics", slogx.Err(err)) + } // update metadata p.Metadata.Reset() - err := p.Metadata.LazySetValueInt64("time", "http", time.Since(start).Microseconds()) + err = p.Metadata.LazySetValueInt64("time", "http", time.Since(start).Microseconds()) if err != nil { p.Logger.Error("metadata time", slogx.Err(err)) } @@ -177,72 +166,18 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) { } } -// writeMetrics writes metrics to the writer, skipping duplicates. -// Normally Render() only adds one TYPE/HELP for each metric type. -// Some metric types (e.g., metadata_collector_metrics) are submitted from multiple collectors. -// That causes duplicates that are suppressed in this function. -// The seen map is used to keep track of which metrics have been added. -func (p *Prometheus) writeMetrics(w io.Writer, metrics [][]byte, tagsSeen map[string]struct{}) int { - - var count int - - for i := 0; i < len(metrics); i++ { - metric := metrics[i] - if bytes.HasPrefix(metric, []byte("# ")) { - - // Find the metric name and check if it has been seen before - var ( - spacesSeen int - space2Index int - ) - - for j := range metric { - if metric[j] == ' ' { - spacesSeen++ - if spacesSeen == 2 { - space2Index = j - } else if spacesSeen == 3 { - name := string(metric[space2Index+1 : j]) - if _, ok := tagsSeen[name]; !ok { - tagsSeen[name] = struct{}{} - p.writeMetric(w, metric) - count++ - if i+1 < len(metrics) { - p.writeMetric(w, metrics[i+1]) - count++ - i++ - } - } - break - } - } - } - } else { - p.writeMetric(w, metric) - count++ - } - } - - return count -} - -func (p *Prometheus) writeMetric(w io.Writer, data []byte) { - _, err := w.Write(data) - if err != nil { - p.Logger.Error("write metrics", slogx.Err(err)) - return - } - _, err = w.Write([]byte("\n")) - if err != nil { - p.Logger.Error("write newline", slogx.Err(err)) - return - } -} - // ServeInfo provides a human-friendly overview of metric types and source collectors // this is done in a very inefficient way, by "reverse engineering" the metrics. // That's probably ok, since we don't expect this to be called often. func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { + + var ( + numCollectors int + numObjects int + numMetrics int + uniqueData map[string]map[string][]string + ) + start := time.Now() if !p.checkAddr(r.RemoteAddr) { @@ -254,70 +189,16 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { body := make([]string, 0) - numCollectors := 0 - numObjects := 0 - numMetrics := 0 - - uniqueData := map[string]map[string][]string{} - - if p.useDiskCache { - p.diskCache.Lock() - stats, err := p.diskCache.GetStats() - p.diskCache.Unlock() - if err != nil { - p.Logger.Error("failed to get cache statistics", slogx.Err(err)) - http.Error(w, "Failed to collect cache statistics", http.StatusInternalServerError) - return - } - - numCollectors = stats.NumCollectors - numObjects = stats.NumObjects - numMetrics = stats.NumMetrics - uniqueData = stats.UniqueData - } else { - p.memoryCache.Lock() - cacheData := make(map[string][][]byte) - for key, data := range p.memoryCache.Get() { - cacheData[key] = make([][]byte, len(data)) - copy(cacheData[key], data) - } - p.memoryCache.Unlock() - - p.Logger.Debug("fetching cached elements", slog.Int("count", len(cacheData))) - - for key, data := range cacheData { - var collector, object string - - if keys := strings.Split(key, "."); len(keys) == 3 { - collector = keys[0] - object = keys[1] - } else { - continue - } - - // skip metadata - if strings.HasPrefix(object, "metadata_") { - continue - } - - metricNames := set.New() - for _, m := range data { - if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" { - metricNames.Add(x[0]) - } - } - numMetrics += metricNames.Size() - - if _, exists := uniqueData[collector]; !exists { - uniqueData[collector] = make(map[string][]string) - numCollectors++ - } - if _, exists := uniqueData[collector][object]; !exists { - numObjects++ - } - uniqueData[collector][object] = metricNames.Values() - } + overview, err := p.aCache.getOverview() + if err != nil { + p.Logger.Error("failed to get cache statistics", slogx.Err(err)) + http.Error(w, "Failed to collect cache statistics", http.StatusInternalServerError) + return } + numCollectors = overview.NumCollectors + numObjects = overview.NumObjects + numMetrics = overview.NumMetrics + uniqueData = overview.UniqueData for col, perObject := range uniqueData { objects := make([]string, 0) @@ -339,7 +220,7 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "text/html") - _, err := w.Write([]byte(bodyFlat)) + _, err = w.Write([]byte(bodyFlat)) if err != nil { p.Logger.Error("write info", slogx.Err(err)) } diff --git a/cmd/exporters/prometheus/prometheus.go b/cmd/exporters/prometheus/prometheus.go index d9b9bbfdd..a4e22f14e 100644 --- a/cmd/exporters/prometheus/prometheus.go +++ b/cmd/exporters/prometheus/prometheus.go @@ -49,8 +49,7 @@ const ( type Prometheus struct { *exporter.AbstractExporter - memoryCache *cache - diskCache *diskCache + aCache cacher allowAddrs []string allowAddrsRegex []*regexp.Regexp cacheAddrs map[string]bool @@ -58,37 +57,26 @@ type Prometheus struct { addMetaTags bool globalPrefix string replacer *strings.Replacer - useDiskCache bool } func New(abc *exporter.AbstractExporter) exporter.Exporter { return &Prometheus{AbstractExporter: abc} } -func (p *Prometheus) createCache(d time.Duration) { - if p.useDiskCache { - // Path is mandatory when disk cache is enabled - if p.Params.DiskCache == nil || p.Params.DiskCache.Path == "" { - p.Logger.Error("disk cache enabled but path is not specified") - return - } +func (p *Prometheus) createCacher(dur time.Duration) cacher { + if p.Params.DiskCache != nil && p.Params.DiskCache.Path != "" { + p.Logger.Debug("disk cache enabled - will use disk-based caching for RSS optimization", + slog.String("path", p.Params.DiskCache.Path)) cacheDir := p.Params.DiskCache.Path - // Include poller name in cache directory to avoid collisions between multiple pollers if p.Options.Poller != "" { cacheDir = filepath.Join(cacheDir, p.Options.Poller) } - - p.diskCache = newDiskCache(d, cacheDir, p.Logger) - - if p.diskCache != nil { - p.Logger.Debug("disk cache configured", - slog.String("cacheDir", cacheDir)) - } - } else { - p.memoryCache = newCache(d) + return newDiskCache(dur, cacheDir, p.Logger) } + + return newMemCache(p.Logger, dur) } func (p *Prometheus) Init() error { @@ -128,36 +116,25 @@ func (p *Prometheus) Init() error { p.addMetaTags = true } - // Check if disk cache is enabled (path is mandatory) - if p.Params.DiskCache != nil && p.Params.DiskCache.Path != "" { - p.useDiskCache = true - p.Logger.Debug("disk cache enabled - will use disk-based caching for RSS optimization", - slog.String("path", p.Params.DiskCache.Path)) - } else { - p.useDiskCache = false - p.Logger.Debug("disk cache disabled - using memory-based caching") - } - - // all other parameters are only relevant to the HTTP daemon + maxKeep := cacheMaxKeep + var maxKeepDur time.Duration if x := p.Params.CacheMaxKeep; x != nil { - if d, err := time.ParseDuration(*x); err == nil { - p.Logger.Debug("using custom cache_max_keep", slog.String("cacheMaxKeep", *x)) - p.createCache(d) - } else { - p.Logger.Error("cache_max_keep", slogx.Err(err), slog.String("x", *x)) - } + maxKeep = *x + p.Logger.Debug("using custom cache_max_keep", slog.String("cacheMaxKeep", maxKeep)) } - - if p.memoryCache == nil && p.diskCache == nil { - p.Logger.Debug("using default cache_max_keep", slog.String("cacheMaxKeep", cacheMaxKeep)) - if d, err := time.ParseDuration(cacheMaxKeep); err == nil { - p.createCache(d) - } else { - return err - } + d, err := time.ParseDuration(maxKeep) + if err != nil { + p.Logger.Error("failed to use cache_max_keep duration. Using default", slogx.Err(err), + slog.String("maxKeep", maxKeep), + slog.String("default", cacheMaxKeep), + ) + maxKeepDur, _ = time.ParseDuration(cacheMaxKeep) + } else { + maxKeepDur = d } - if p.memoryCache == nil && p.diskCache == nil { + p.aCache = p.createCacher(maxKeepDur) + if !p.aCache.isValid() { return errs.New(errs.ErrInvalidParam, "cache initialization failed") } @@ -250,9 +227,10 @@ func newReplacer() *strings.Replacer { func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) { var ( - metrics [][]byte - stats exporter.Stats - err error + metrics [][]byte + stats exporter.Stats + err error + metricNames *set.Set ) // lock the exporter, to prevent other collectors from writing to us @@ -261,39 +239,15 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) { // render metrics into Prometheus format start := time.Now() - metrics, stats = p.render(data) + metrics, stats, metricNames = p.render(data) // fix render time for metadata d := time.Since(start) - // Extract metric names from matrix for cache statistics - var prefix string - if data.Object == "" { - prefix = strings.TrimSuffix(p.globalPrefix, "_") - } else { - prefix = p.globalPrefix + data.Object - } - - metricNames := set.New() - for _, metric := range data.GetMetrics() { - if metric.IsExportable() { - metricNames.Add(prefix + "_" + metric.GetName()) - } - } - // store metrics in cache key := data.UUID + "." + data.Object + "." + data.Identifier - // lock cache, to prevent HTTPd reading while we are mutating it - if p.useDiskCache { - p.diskCache.Lock() - p.diskCache.Put(key, metrics, metricNames) - p.diskCache.Unlock() - } else { - p.memoryCache.Lock() - p.memoryCache.Put(key, metrics, metricNames) - p.memoryCache.Unlock() - } + p.aCache.exportMetrics(key, metrics, metricNames) // update metadata p.AddExportCount(uint64(len(metrics))) @@ -325,7 +279,7 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) { // volume_read_ops{node="my-node",vol="some_vol"} 2523 // fcp_lif_read_ops{vserver="nas_svm",port_id="e02"} 771 -func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { +func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats, *set.Set) { var ( rendered [][]byte tagged *set.Set @@ -345,6 +299,7 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { buf.Grow(4096) globalLabels := make([]string, 0, len(data.GetGlobalLabels())) normalizedLabels = make(map[string][]string) + metricNames := set.New() if p.addMetaTags { tagged = set.New() @@ -402,6 +357,7 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { if !metric.IsExportable() { continue } + metricNames.Add(prefix + "_" + metric.GetName()) exportableMetrics++ } @@ -739,7 +695,7 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) { RenderedBytes: renderedBytes, } - return rendered, stats + return rendered, stats, metricNames } var numAndUnitRe = regexp.MustCompile(`(\d+)\s*(\w+)`) diff --git a/cmd/exporters/prometheus/prometheus_test.go b/cmd/exporters/prometheus/prometheus_test.go index a38dcb244..1e2f6f75b 100644 --- a/cmd/exporters/prometheus/prometheus_test.go +++ b/cmd/exporters/prometheus/prometheus_test.go @@ -17,37 +17,6 @@ import ( "github.com/netapp/harvest/v2/pkg/matrix" ) -func TestFilterMetaTags(t *testing.T) { - - example := [][]byte{ - []byte(`# HELP some_metric help text`), - []byte(`# TYPE some_metric type`), - []byte(`some_metric{node="node_1"} 0.0`), - []byte(`# HELP some_other_metric help text`), - []byte(`# TYPE some_other_metric type`), - []byte(`some_other_metric{node="node_2"} 0.0`), - []byte(`# HELP some_other_metric DUPLICATE help text`), - []byte(`# TYPE some_other_metric type`), - []byte(`some_other_metric{node="node_3"} 0.0`), - } - - expected := `# HELP some_metric help text -# TYPE some_metric type -some_metric{node="node_1"} 0.0 -# HELP some_other_metric help text -# TYPE some_other_metric type -some_other_metric{node="node_2"} 0.0 -some_other_metric{node="node_3"} 0.0 -` - p := Prometheus{} - seen := make(map[string]struct{}) - var w strings.Builder - _ = p.writeMetrics(&w, example, seen) - - diff := cmp.Diff(w.String(), expected) - assert.Equal(t, diff, "") -} - func TestEscape(t *testing.T) { replacer := newReplacer() @@ -149,7 +118,7 @@ net_app_bike_max_speed{} 3`, "bike"}, prom := p.(*Prometheus) var lines []string - for _, metrics := range prom.memoryCache.Get() { + for _, metrics := range prom.aCache.(*memCache).Get() { for _, metric := range metrics { lines = append(lines, string(metric)) } @@ -187,7 +156,7 @@ netapp_change_log{category="metric",cluster="umeng-aff300-01-02",object="volume" prom := p.(*Prometheus) var lines []string - for _, metrics := range prom.memoryCache.Get() { + for _, metrics := range prom.aCache.(*memCache).Get() { for _, metric := range metrics { lines = append(lines, string(metric)) } @@ -262,7 +231,7 @@ func TestRenderHistogramExample(t *testing.T) { prom := p.(*Prometheus) var lines []string - for _, metrics := range prom.memoryCache.Get() { + for _, metrics := range prom.aCache.(*memCache).Get() { for _, metricLine := range metrics { sline := string(metricLine) if !strings.HasPrefix(sline, "#") {