diff --git a/cmd/exporters/prometheus/cache.go b/cmd/exporters/prometheus/cache.go
index ae73864c8..9f5da5379 100644
--- a/cmd/exporters/prometheus/cache.go
+++ b/cmd/exporters/prometheus/cache.go
@@ -5,35 +5,188 @@
package prometheus
import (
+ "bytes"
+ "github.com/netapp/harvest/v2/pkg/set"
+ "github.com/netapp/harvest/v2/pkg/slogx"
+ "io"
+ "log/slog"
+ "net/http"
+ "strings"
"sync"
"time"
)
-type cache struct {
- *sync.Mutex
+type cacher interface {
+ getOverview() (*CacheStats, error)
+ exportMetrics(key string, data [][]byte, names *set.Set)
+ streamMetrics(w http.ResponseWriter, seen map[string]struct{}, metrics [][]byte) (int, error)
+ isValid() bool
+}
+
+type memCache struct {
+ mu *sync.Mutex
+ logger *slog.Logger
data map[string][][]byte
timers map[string]time.Time
expire time.Duration
}
-func newCache(d time.Duration) *cache {
- c := cache{Mutex: &sync.Mutex{}, expire: d}
+func (c *memCache) isValid() bool {
+ return true
+}
+
+func (c *memCache) getOverview() (*CacheStats, error) {
+ c.mu.Lock()
+ cacheData := make(map[string][][]byte)
+ for key, data := range c.Get() {
+ cacheData[key] = make([][]byte, len(data))
+ copy(cacheData[key], data)
+ }
+ c.mu.Unlock()
+
+ stats := &CacheStats{
+ UniqueData: make(map[string]map[string][]string),
+ }
+
+ for key, data := range cacheData {
+ var collector, object string
+
+ if keys := strings.Split(key, "."); len(keys) == 3 {
+ collector = keys[0]
+ object = keys[1]
+ } else {
+ continue
+ }
+
+ // skip metadata
+ if strings.HasPrefix(object, "metadata_") {
+ continue
+ }
+
+ metricNames := set.New()
+ for _, m := range data {
+ if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" {
+ metricNames.Add(x[0])
+ }
+ }
+ stats.NumMetrics += metricNames.Size()
+
+ if _, exists := stats.UniqueData[collector]; !exists {
+ stats.UniqueData[collector] = make(map[string][]string)
+ stats.NumCollectors++
+ }
+ if _, exists := stats.UniqueData[collector][object]; !exists {
+ stats.NumObjects++
+ }
+ stats.UniqueData[collector][object] = metricNames.Values()
+ }
+
+ return stats, nil
+}
+
+func (c *memCache) exportMetrics(key string, data [][]byte, metricNames *set.Set) {
+ c.Put(key, data, metricNames)
+}
+
+func (c *memCache) streamMetrics(w http.ResponseWriter, tagsSeen map[string]struct{}, metrics [][]byte) (int, error) {
+ c.mu.Lock()
+ var count int
+ if metrics == nil {
+ // stream all cached metrics
+ for _, metrics := range c.Get() {
+ count += c.writeMetrics(w, metrics, tagsSeen)
+ }
+ } else {
+ // stream only provided metrics
+ count += c.writeMetrics(w, metrics, tagsSeen)
+ }
+
+ c.mu.Unlock()
+
+ return count, nil
+}
+
+// writeMetrics writes metrics to the writer, skipping duplicates.
+// Normally Render() only adds one TYPE/HELP for each metric type.
+// Some metric types (e.g., metadata_collector_metrics) are submitted from multiple collectors.
+// That causes duplicates that are suppressed in this function.
+// The seen map is used to keep track of which metrics have been added.
+func (c *memCache) writeMetrics(w io.Writer, metrics [][]byte, tagsSeen map[string]struct{}) int {
+
+ var count int
+
+ for i := 0; i < len(metrics); i++ {
+ metric := metrics[i]
+ if bytes.HasPrefix(metric, []byte("# ")) {
+
+ // Find the metric name and check if it has been seen before
+ var (
+ spacesSeen int
+ space2Index int
+ )
+
+ for j := range metric {
+ if metric[j] == ' ' {
+ spacesSeen++
+ if spacesSeen == 2 {
+ space2Index = j
+ } else if spacesSeen == 3 {
+ name := string(metric[space2Index+1 : j])
+ if _, ok := tagsSeen[name]; !ok {
+ tagsSeen[name] = struct{}{}
+ c.writeMetric(w, metric)
+ count++
+ if i+1 < len(metrics) {
+ c.writeMetric(w, metrics[i+1])
+ count++
+ i++
+ }
+ }
+ break
+ }
+ }
+ }
+ } else {
+ c.writeMetric(w, metric)
+ count++
+ }
+ }
+
+ return count
+}
+
+func (c *memCache) writeMetric(w io.Writer, data []byte) {
+ _, err := w.Write(data)
+ if err != nil {
+ c.logger.Error("write metrics", slogx.Err(err))
+ }
+ _, err = w.Write([]byte("\n"))
+ if err != nil {
+ c.logger.Error("write newline", slogx.Err(err))
+ }
+}
+
+func newMemCache(l *slog.Logger, d time.Duration) *memCache {
+ c := memCache{mu: &sync.Mutex{}, expire: d, logger: l}
c.data = make(map[string][][]byte)
c.timers = make(map[string]time.Time)
return &c
}
-func (c *cache) Get() map[string][][]byte {
+func (c *memCache) Get() map[string][][]byte {
c.Clean()
return c.data
}
-func (c *cache) Put(key string, data [][]byte) {
+func (c *memCache) Put(key string, data [][]byte, _ *set.Set) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
c.data[key] = data
c.timers[key] = time.Now()
}
-func (c *cache) Clean() {
+func (c *memCache) Clean() {
for k, t := range c.timers {
if time.Since(t) > c.expire {
delete(c.timers, k)
diff --git a/cmd/exporters/prometheus/cache_test.go b/cmd/exporters/prometheus/cache_test.go
new file mode 100644
index 000000000..02033bf6f
--- /dev/null
+++ b/cmd/exporters/prometheus/cache_test.go
@@ -0,0 +1,38 @@
+package prometheus
+
+import (
+ "github.com/google/go-cmp/cmp"
+ "github.com/netapp/harvest/v2/assert"
+ "strings"
+ "testing"
+)
+
+func Test_memcache_streamMetrics(t *testing.T) {
+ example := [][]byte{
+ []byte(`# HELP some_metric help text`),
+ []byte(`# TYPE some_metric type`),
+ []byte(`some_metric{node="node_1"} 0.0`),
+ []byte(`# HELP some_other_metric help text`),
+ []byte(`# TYPE some_other_metric type`),
+ []byte(`some_other_metric{node="node_2"} 0.0`),
+ []byte(`# HELP some_other_metric DUPLICATE help text`),
+ []byte(`# TYPE some_other_metric type`),
+ []byte(`some_other_metric{node="node_3"} 0.0`),
+ }
+
+ expected := `# HELP some_metric help text
+# TYPE some_metric type
+some_metric{node="node_1"} 0.0
+# HELP some_other_metric help text
+# TYPE some_other_metric type
+some_other_metric{node="node_2"} 0.0
+some_other_metric{node="node_3"} 0.0
+`
+ m := memCache{}
+ seen := make(map[string]struct{})
+ var w strings.Builder
+ _ = m.writeMetrics(&w, example, seen)
+
+ diff := cmp.Diff(w.String(), expected)
+ assert.Equal(t, diff, "")
+}
diff --git a/cmd/exporters/prometheus/disk_cache.go b/cmd/exporters/prometheus/disk_cache.go
new file mode 100644
index 000000000..394e9751c
--- /dev/null
+++ b/cmd/exporters/prometheus/disk_cache.go
@@ -0,0 +1,378 @@
+package prometheus
+
+import (
+ "bufio"
+ "context"
+ "github.com/netapp/harvest/v2/pkg/set"
+ "github.com/netapp/harvest/v2/pkg/slogx"
+ "io"
+ "log/slog"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+)
+
+// CacheStats holds statistics about cached metrics
+type CacheStats struct {
+ NumCollectors int
+ NumObjects int
+ NumMetrics int
+ UniqueData map[string]map[string][]string
+}
+
+type diskCache struct {
+ mu *sync.Mutex
+ files map[string]string // key -> filepath
+ timers map[string]time.Time // key -> timestamp
+ metricNames map[string]*set.Set // key -> metric names
+ metricCounts map[string]int // key -> number of metric lines
+ expire time.Duration
+ baseDir string
+ logger *slog.Logger
+ ctx context.Context
+ cancel context.CancelFunc
+ writerPool *sync.Pool
+ readerPool *sync.Pool
+ keyReplacer *strings.Replacer
+}
+
+func (dc *diskCache) isValid() bool {
+ return dc != nil && dc.baseDir != ""
+}
+
+func (dc *diskCache) getOverview() (*CacheStats, error) {
+ dc.mu.Lock()
+ stats, err := dc.GetStats()
+ dc.mu.Unlock()
+ if err != nil {
+ return nil, err
+ }
+
+ return stats, nil
+}
+
+func (dc *diskCache) exportMetrics(key string, data [][]byte, metricNames *set.Set) {
+ dc.Put(key, data, metricNames)
+}
+
+func (dc *diskCache) streamMetrics(w http.ResponseWriter, _ map[string]struct{}, metrics [][]byte) (int, error) {
+ // since the disk cache streams all cached metrics including metadata, we ignore streaming when metrics is not nil
+ if metrics != nil {
+ return 0, nil
+ }
+ dc.mu.Lock()
+ defer dc.mu.Unlock()
+
+ err := dc.streamToWriter(w)
+ if err != nil {
+ return 0, err
+ }
+ return dc.GetMetricCount(), nil
+}
+
+func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCache {
+ if d <= 0 {
+ logger.Warn("invalid expire duration, using default 5 minutes", slog.Duration("provided", d))
+ d = 5 * time.Minute
+ }
+ if baseDir == "" {
+ logger.Warn("empty base directory provided")
+ return nil
+ }
+
+ _ = os.RemoveAll(baseDir)
+ if err := os.MkdirAll(baseDir, 0750); err != nil {
+ logger.Warn("failed to create cache directory", slogx.Err(err), slog.String("dir", baseDir))
+ return nil
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ dc := &diskCache{
+ mu: &sync.Mutex{},
+ files: make(map[string]string),
+ timers: make(map[string]time.Time),
+ metricNames: make(map[string]*set.Set),
+ metricCounts: make(map[string]int),
+ expire: d,
+ baseDir: baseDir,
+ logger: logger,
+ ctx: ctx,
+ cancel: cancel,
+ writerPool: &sync.Pool{
+ New: func() any {
+ return bufio.NewWriterSize(nil, 64*1024)
+ },
+ },
+ readerPool: &sync.Pool{
+ New: func() any {
+ return bufio.NewReaderSize(nil, 64*1024)
+ },
+ },
+ keyReplacer: strings.NewReplacer("/", "_", "\\", "_", ":", "_"),
+ }
+
+ go dc.cleanup()
+ return dc
+}
+
+// GetStats returns cache statistics.
+func (dc *diskCache) GetStats() (*CacheStats, error) {
+ stats := &CacheStats{
+ UniqueData: make(map[string]map[string][]string),
+ }
+
+ seenCollectors := make(map[string]struct{})
+ seenObjects := make(map[string]struct{})
+
+ for key := range dc.files {
+ if dc.isExpired(key) {
+ continue
+ }
+
+ parts := strings.Split(key, ".")
+ if len(parts) < 2 {
+ continue
+ }
+
+ collector := parts[0]
+ object := parts[1]
+
+ if strings.HasPrefix(object, "metadata_") {
+ continue
+ }
+
+ metricNames, exists := dc.metricNames[key]
+ if !exists || metricNames == nil || metricNames.Size() == 0 {
+ continue
+ }
+
+ stats.NumMetrics += metricNames.Size()
+
+ if _, exists := stats.UniqueData[collector]; !exists {
+ stats.UniqueData[collector] = make(map[string][]string)
+ seenCollectors[collector] = struct{}{}
+ }
+
+ objectKey := collector + "." + object
+ if _, exists := stats.UniqueData[collector][object]; !exists {
+ seenObjects[objectKey] = struct{}{}
+ }
+
+ stats.UniqueData[collector][object] = metricNames.Values()
+ }
+
+ stats.NumCollectors = len(seenCollectors)
+ stats.NumObjects = len(seenObjects)
+
+ return stats, nil
+}
+
+// GetMetricCount returns the total number of cached metrics.
+func (dc *diskCache) GetMetricCount() int {
+ count := 0
+ for key := range dc.files {
+ if dc.isExpired(key) {
+ continue
+ }
+ if metricCount, exists := dc.metricCounts[key]; exists {
+ count += metricCount
+ }
+ }
+ return count
+}
+
+// Put stores metrics to disk and updates cache metadata.
+func (dc *diskCache) Put(key string, data [][]byte, metricNames *set.Set) {
+ dc.mu.Lock()
+ defer dc.mu.Unlock()
+
+ filePath := dc.generateFilepath(key)
+
+ if err := dc.writeToDisk(filePath, data); err != nil {
+ dc.logger.Warn("failed to write cache file",
+ slogx.Err(err),
+ slog.String("key", key),
+ slog.String("file", filePath))
+ return
+ }
+
+ dc.files[key] = filePath
+ dc.timers[key] = time.Now()
+ if metricNames != nil && metricNames.Size() > 0 {
+ dc.metricNames[key] = metricNames
+ } else {
+ dc.metricNames[key] = nil
+ }
+ dc.metricCounts[key] = len(data)
+
+ dc.logger.Debug("cached metrics to disk",
+ slog.String("key", key),
+ slog.String("file", filePath),
+ slog.Int("metrics_count", len(data)))
+}
+
+// streamToWriter streams all non-expired cache files to the writer.
+func (dc *diskCache) streamToWriter(w io.Writer) error {
+ var resultErr error
+ errorCount := 0
+ totalCount := 0
+
+ for key, path := range dc.files {
+ if dc.isExpired(key) {
+ continue
+ }
+ totalCount++
+
+ if err := dc.streamFile(path, w); err != nil {
+ errorCount++
+ if resultErr == nil {
+ resultErr = err
+ }
+ dc.logger.Debug("failed to stream cache file",
+ slogx.Err(err), slog.String("file", path))
+ }
+ }
+
+ if resultErr != nil {
+ dc.logger.Warn("failed to stream some cache files",
+ slog.Int("failed_count", errorCount),
+ slog.Int("total_count", totalCount))
+ }
+ return resultErr
+}
+
+func (dc *diskCache) openFile(filePath string) (*os.File, error) {
+ file, err := os.Open(filePath)
+ if os.IsNotExist(err) {
+ return nil, nil
+ }
+ return file, err
+}
+
+func (dc *diskCache) closeFile(file *os.File) {
+ if err := file.Close(); err != nil {
+ dc.logger.Debug("failed to close file", slogx.Err(err))
+ }
+}
+
+func (dc *diskCache) streamFile(filePath string, w io.Writer) error {
+ file, err := dc.openFile(filePath)
+ if err != nil {
+ return err
+ }
+ if file == nil {
+ dc.logger.Debug("file is nil", slog.String("filePath", filePath))
+ return nil
+ }
+ defer dc.closeFile(file)
+
+ reader := dc.readerPool.Get().(*bufio.Reader)
+ reader.Reset(file)
+ defer dc.readerPool.Put(reader)
+
+ _, err = io.Copy(w, reader)
+ return err
+}
+
+func (dc *diskCache) Clean() {
+ dc.mu.Lock()
+ defer dc.mu.Unlock()
+
+ for key, timestamp := range dc.timers {
+ if time.Since(timestamp) <= dc.expire {
+ continue
+ }
+ filePath := dc.files[key]
+
+ delete(dc.files, key)
+ delete(dc.timers, key)
+ delete(dc.metricNames, key)
+ delete(dc.metricCounts, key)
+
+ if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
+ dc.logger.Debug("failed to remove expired cache file",
+ slogx.Err(err),
+ slog.String("file", filePath))
+ }
+
+ dc.logger.Debug("expired cache entry", slog.String("key", key))
+ }
+
+ entries, err := os.ReadDir(dc.baseDir)
+ if err != nil {
+ dc.logger.Debug("failed to read cache directory", slogx.Err(err), slog.String("baseDir", dc.baseDir))
+ return
+ }
+
+ knownFiles := make(map[string]struct{}, len(dc.files))
+ for _, path := range dc.files {
+ knownFiles[path] = struct{}{}
+ }
+
+ for _, entry := range entries {
+ fullPath := filepath.Join(dc.baseDir, entry.Name())
+
+ if _, found := knownFiles[fullPath]; !found {
+ _ = os.Remove(fullPath)
+ }
+ }
+}
+
+func (dc *diskCache) generateFilepath(key string) string {
+ safeKey := dc.keyReplacer.Replace(key)
+ return filepath.Join(dc.baseDir, safeKey+".metrics")
+}
+
+func (dc *diskCache) writeToDisk(filePath string, data [][]byte) error {
+ file, err := os.Create(filePath)
+ if err != nil {
+ return err
+ }
+ defer dc.closeFile(file)
+
+ writer := dc.writerPool.Get().(*bufio.Writer)
+ writer.Reset(file)
+ defer dc.writerPool.Put(writer)
+
+ for _, line := range data {
+ if _, err := writer.Write(line); err != nil {
+ return err
+ }
+ if err := writer.WriteByte('\n'); err != nil {
+ return err
+ }
+ }
+
+ return writer.Flush()
+}
+
+// isExpired checks if a key is expired.
+func (dc *diskCache) isExpired(key string) bool {
+ if timer, exists := dc.timers[key]; exists {
+ return time.Since(timer) >= dc.expire
+ }
+ return true
+}
+
+func (dc *diskCache) cleanup() {
+ ticker := time.NewTicker(dc.expire / 2) // Clean twice per expiry period
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-dc.ctx.Done():
+ return
+ case <-ticker.C:
+ dc.Clean()
+ }
+ }
+}
+
+func (dc *diskCache) Shutdown() {
+ if dc.cancel != nil {
+ dc.cancel()
+ }
+}
diff --git a/cmd/exporters/prometheus/httpd.go b/cmd/exporters/prometheus/httpd.go
index 02f4a0ce8..f8f82e451 100644
--- a/cmd/exporters/prometheus/httpd.go
+++ b/cmd/exporters/prometheus/httpd.go
@@ -7,12 +7,8 @@
package prometheus
import (
- "bytes"
"errors"
"fmt"
- "github.com/netapp/harvest/v2/pkg/set"
- "github.com/netapp/harvest/v2/pkg/slogx"
- "io"
"log/slog"
"net"
"net/http"
@@ -22,6 +18,8 @@ import (
"strconv"
"strings"
"time"
+
+ "github.com/netapp/harvest/v2/pkg/slogx"
)
func (p *Prometheus) startHTTPD(addr string, port int) {
@@ -138,26 +136,27 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) {
return
}
- p.cache.Lock()
- tagsSeen := make(map[string]struct{})
-
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
- for _, metrics := range p.cache.Get() {
- count += p.writeMetrics(w, metrics, tagsSeen)
+ tagsSeen := make(map[string]struct{})
+
+ _, err := p.aCache.streamMetrics(w, tagsSeen, nil)
+ if err != nil {
+ p.Logger.Error("failed to stream metrics", slogx.Err(err))
}
// serve our own metadata
// notice that some values are always taken from previous session
- md, _ := p.render(p.Metadata)
- count += p.writeMetrics(w, md, tagsSeen)
-
- p.cache.Unlock()
+ md, _, _ := p.render(p.Metadata)
+ _, err = p.aCache.streamMetrics(w, tagsSeen, md)
+ if err != nil {
+ p.Logger.Error("failed to stream metadata metrics", slogx.Err(err))
+ }
// update metadata
p.Metadata.Reset()
- err := p.Metadata.LazySetValueInt64("time", "http", time.Since(start).Microseconds())
+ err = p.Metadata.LazySetValueInt64("time", "http", time.Since(start).Microseconds())
if err != nil {
p.Logger.Error("metadata time", slogx.Err(err))
}
@@ -167,72 +166,18 @@ func (p *Prometheus) ServeMetrics(w http.ResponseWriter, r *http.Request) {
}
}
-// writeMetrics writes metrics to the writer, skipping duplicates.
-// Normally Render() only adds one TYPE/HELP for each metric type.
-// Some metric types (e.g., metadata_collector_metrics) are submitted from multiple collectors.
-// That causes duplicates that are suppressed in this function.
-// The seen map is used to keep track of which metrics have been added.
-func (p *Prometheus) writeMetrics(w io.Writer, metrics [][]byte, tagsSeen map[string]struct{}) int {
-
- var count int
-
- for i := 0; i < len(metrics); i++ {
- metric := metrics[i]
- if bytes.HasPrefix(metric, []byte("# ")) {
-
- // Find the metric name and check if it has been seen before
- var (
- spacesSeen int
- space2Index int
- )
-
- for j := range metric {
- if metric[j] == ' ' {
- spacesSeen++
- if spacesSeen == 2 {
- space2Index = j
- } else if spacesSeen == 3 {
- name := string(metric[space2Index+1 : j])
- if _, ok := tagsSeen[name]; !ok {
- tagsSeen[name] = struct{}{}
- p.writeMetric(w, metric)
- count++
- if i+1 < len(metrics) {
- p.writeMetric(w, metrics[i+1])
- count++
- i++
- }
- }
- break
- }
- }
- }
- } else {
- p.writeMetric(w, metric)
- count++
- }
- }
-
- return count
-}
-
-func (p *Prometheus) writeMetric(w io.Writer, data []byte) {
- _, err := w.Write(data)
- if err != nil {
- p.Logger.Error("write metrics", slogx.Err(err))
- return
- }
- _, err = w.Write([]byte("\n"))
- if err != nil {
- p.Logger.Error("write newline", slogx.Err(err))
- return
- }
-}
-
// ServeInfo provides a human-friendly overview of metric types and source collectors
// this is done in a very inefficient way, by "reverse engineering" the metrics.
// That's probably ok, since we don't expect this to be called often.
func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) {
+
+ var (
+ numCollectors int
+ numObjects int
+ numMetrics int
+ uniqueData map[string]map[string][]string
+ )
+
start := time.Now()
if !p.checkAddr(r.RemoteAddr) {
@@ -244,52 +189,16 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) {
body := make([]string, 0)
- numCollectors := 0
- numObjects := 0
- numMetrics := 0
-
- uniqueData := map[string]map[string][]string{}
-
- // copy cache so we don't lock it
- p.cache.Lock()
- cache := make(map[string][][]byte)
- for key, data := range p.cache.Get() {
- cache[key] = make([][]byte, len(data))
- copy(cache[key], data)
- }
- p.cache.Unlock()
-
- p.Logger.Debug("fetching cached elements", slog.Int("count", len(cache)))
-
- for key, data := range cache {
- var collector, object string
-
- if keys := strings.Split(key, "."); len(keys) == 3 {
- collector = keys[0]
- object = keys[1]
- } else {
- continue
- }
-
- // skip metadata
- if strings.HasPrefix(object, "metadata_") {
- continue
- }
-
- metricNames := set.New()
- for _, m := range data {
- if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" {
- metricNames.Add(x[0])
- }
- }
- numMetrics += metricNames.Size()
-
- if _, exists := uniqueData[collector]; !exists {
- uniqueData[collector] = make(map[string][]string)
- }
- uniqueData[collector][object] = metricNames.Values()
-
+ overview, err := p.aCache.getOverview()
+ if err != nil {
+ p.Logger.Error("failed to get cache statistics", slogx.Err(err))
+ http.Error(w, "Failed to collect cache statistics", http.StatusInternalServerError)
+ return
}
+ numCollectors = overview.NumCollectors
+ numObjects = overview.NumObjects
+ numMetrics = overview.NumMetrics
+ uniqueData = overview.UniqueData
for col, perObject := range uniqueData {
objects := make([]string, 0)
@@ -301,11 +210,9 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) {
}
}
objects = append(objects, fmt.Sprintf(objectTemplate, obj, strings.Join(metrics, "\n")))
- numObjects++
}
body = append(body, fmt.Sprintf(collectorTemplate, col, strings.Join(objects, "\n")))
- numCollectors++
}
poller := p.Options.Poller
@@ -313,7 +220,7 @@ func (p *Prometheus) ServeInfo(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "text/html")
- _, err := w.Write([]byte(bodyFlat))
+ _, err = w.Write([]byte(bodyFlat))
if err != nil {
p.Logger.Error("write info", slogx.Err(err))
}
diff --git a/cmd/exporters/prometheus/prometheus.go b/cmd/exporters/prometheus/prometheus.go
index 9cc69f57f..a4e22f14e 100644
--- a/cmd/exporters/prometheus/prometheus.go
+++ b/cmd/exporters/prometheus/prometheus.go
@@ -30,6 +30,7 @@ import (
"github.com/netapp/harvest/v2/pkg/set"
"github.com/netapp/harvest/v2/pkg/slogx"
"log/slog"
+ "path/filepath"
"regexp"
"slices"
"sort"
@@ -48,7 +49,7 @@ const (
type Prometheus struct {
*exporter.AbstractExporter
- cache *cache
+ aCache cacher
allowAddrs []string
allowAddrsRegex []*regexp.Regexp
cacheAddrs map[string]bool
@@ -62,6 +63,22 @@ func New(abc *exporter.AbstractExporter) exporter.Exporter {
return &Prometheus{AbstractExporter: abc}
}
+func (p *Prometheus) createCacher(dur time.Duration) cacher {
+ if p.Params.DiskCache != nil && p.Params.DiskCache.Path != "" {
+ p.Logger.Debug("disk cache enabled - will use disk-based caching for RSS optimization",
+ slog.String("path", p.Params.DiskCache.Path))
+
+ cacheDir := p.Params.DiskCache.Path
+ // Include poller name in cache directory to avoid collisions between multiple pollers
+ if p.Options.Poller != "" {
+ cacheDir = filepath.Join(cacheDir, p.Options.Poller)
+ }
+ return newDiskCache(dur, cacheDir, p.Logger)
+ }
+
+ return newMemCache(p.Logger, dur)
+}
+
func (p *Prometheus) Init() error {
if err := p.InitAbc(); err != nil {
@@ -99,23 +116,26 @@ func (p *Prometheus) Init() error {
p.addMetaTags = true
}
- // all other parameters are only relevant to the HTTP daemon
+ maxKeep := cacheMaxKeep
+ var maxKeepDur time.Duration
if x := p.Params.CacheMaxKeep; x != nil {
- if d, err := time.ParseDuration(*x); err == nil {
- p.Logger.Debug("using custom cache_max_keep", slog.String("cacheMaxKeep", *x))
- p.cache = newCache(d)
- } else {
- p.Logger.Error("cache_max_keep", slogx.Err(err), slog.String("x", *x))
- }
+ maxKeep = *x
+ p.Logger.Debug("using custom cache_max_keep", slog.String("cacheMaxKeep", maxKeep))
+ }
+ d, err := time.ParseDuration(maxKeep)
+ if err != nil {
+ p.Logger.Error("failed to use cache_max_keep duration. Using default", slogx.Err(err),
+ slog.String("maxKeep", maxKeep),
+ slog.String("default", cacheMaxKeep),
+ )
+ maxKeepDur, _ = time.ParseDuration(cacheMaxKeep)
+ } else {
+ maxKeepDur = d
}
- if p.cache == nil {
- p.Logger.Debug("using default cache_max_keep", slog.String("cacheMaxKeep", cacheMaxKeep))
- if d, err := time.ParseDuration(cacheMaxKeep); err == nil {
- p.cache = newCache(d)
- } else {
- return err
- }
+ p.aCache = p.createCacher(maxKeepDur)
+ if !p.aCache.isValid() {
+ return errs.New(errs.ErrInvalidParam, "cache initialization failed")
}
// allow access to metrics only from the given plain addresses
@@ -207,9 +227,10 @@ func newReplacer() *strings.Replacer {
func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) {
var (
- metrics [][]byte
- stats exporter.Stats
- err error
+ metrics [][]byte
+ stats exporter.Stats
+ err error
+ metricNames *set.Set
)
// lock the exporter, to prevent other collectors from writing to us
@@ -218,7 +239,7 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) {
// render metrics into Prometheus format
start := time.Now()
- metrics, stats = p.render(data)
+ metrics, stats, metricNames = p.render(data)
// fix render time for metadata
d := time.Since(start)
@@ -226,10 +247,7 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) {
// store metrics in cache
key := data.UUID + "." + data.Object + "." + data.Identifier
- // lock cache, to prevent HTTPd reading while we are mutating it
- p.cache.Lock()
- p.cache.Put(key, metrics)
- p.cache.Unlock()
+ p.aCache.exportMetrics(key, metrics, metricNames)
// update metadata
p.AddExportCount(uint64(len(metrics)))
@@ -261,7 +279,7 @@ func (p *Prometheus) Export(data *matrix.Matrix) (exporter.Stats, error) {
// volume_read_ops{node="my-node",vol="some_vol"} 2523
// fcp_lif_read_ops{vserver="nas_svm",port_id="e02"} 771
-func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) {
+func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats, *set.Set) {
var (
rendered [][]byte
tagged *set.Set
@@ -281,6 +299,7 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) {
buf.Grow(4096)
globalLabels := make([]string, 0, len(data.GetGlobalLabels()))
normalizedLabels = make(map[string][]string)
+ metricNames := set.New()
if p.addMetaTags {
tagged = set.New()
@@ -338,6 +357,7 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) {
if !metric.IsExportable() {
continue
}
+ metricNames.Add(prefix + "_" + metric.GetName())
exportableMetrics++
}
@@ -506,7 +526,21 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) {
if p.Params.SortLabels {
sort.Strings(metricLabels)
}
- x := prefix + "_" + metric.GetName() + "{" + joinedKeys + "," + strings.Join(metricLabels, ",") + "} " + value
+
+ buf.Reset()
+ buf.WriteString(prefix)
+ buf.WriteString("_")
+ buf.WriteString(metric.GetName())
+ buf.WriteString("{")
+ buf.WriteString(joinedKeys)
+ buf.WriteString(",")
+ buf.WriteString(strings.Join(metricLabels, ","))
+ buf.WriteString("} ")
+ buf.WriteString(value)
+
+ xbr := buf.Bytes()
+ metricLine := make([]byte, len(xbr))
+ copy(metricLine, xbr)
prefixedName := prefix + "_" + metric.GetName()
if tagged != nil && !tagged.Has(prefixedName) {
@@ -517,8 +551,8 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) {
renderedBytes += uint64(len(help)) + uint64(len(typeT))
}
- rendered = append(rendered, []byte(x))
- renderedBytes += uint64(len(x))
+ rendered = append(rendered, metricLine)
+ renderedBytes += uint64(len(metricLine))
// scalar metric
} else {
buf.Reset()
@@ -651,13 +685,17 @@ func (p *Prometheus) render(data *matrix.Matrix) ([][]byte, exporter.Stats) {
}
}
+ // Both memory and disk cache add a newline character after each metric line
+ // when serving via HTTP (see writeMetric() and writeToDisk())
+ renderedBytes += uint64(len(rendered)) // Add 1 byte per line for '\n'
+
stats := exporter.Stats{
InstancesExported: instancesExported,
MetricsExported: uint64(len(rendered)),
RenderedBytes: renderedBytes,
}
- return rendered, stats
+ return rendered, stats, metricNames
}
var numAndUnitRe = regexp.MustCompile(`(\d+)\s*(\w+)`)
diff --git a/cmd/exporters/prometheus/prometheus_test.go b/cmd/exporters/prometheus/prometheus_test.go
index 16d268984..1e2f6f75b 100644
--- a/cmd/exporters/prometheus/prometheus_test.go
+++ b/cmd/exporters/prometheus/prometheus_test.go
@@ -5,48 +5,18 @@
package prometheus
import (
+ "slices"
+ "strings"
+ "testing"
+
"github.com/google/go-cmp/cmp"
"github.com/netapp/harvest/v2/assert"
"github.com/netapp/harvest/v2/cmd/poller/exporter"
"github.com/netapp/harvest/v2/cmd/poller/options"
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/matrix"
- "slices"
- "strings"
- "testing"
)
-func TestFilterMetaTags(t *testing.T) {
-
- example := [][]byte{
- []byte(`# HELP some_metric help text`),
- []byte(`# TYPE some_metric type`),
- []byte(`some_metric{node="node_1"} 0.0`),
- []byte(`# HELP some_other_metric help text`),
- []byte(`# TYPE some_other_metric type`),
- []byte(`some_other_metric{node="node_2"} 0.0`),
- []byte(`# HELP some_other_metric DUPLICATE help text`),
- []byte(`# TYPE some_other_metric type`),
- []byte(`some_other_metric{node="node_3"} 0.0`),
- }
-
- expected := `# HELP some_metric help text
-# TYPE some_metric type
-some_metric{node="node_1"} 0.0
-# HELP some_other_metric help text
-# TYPE some_other_metric type
-some_other_metric{node="node_2"} 0.0
-some_other_metric{node="node_3"} 0.0
-`
- p := Prometheus{}
- seen := make(map[string]struct{})
- var w strings.Builder
- _ = p.writeMetrics(&w, example, seen)
-
- diff := cmp.Diff(w.String(), expected)
- assert.Equal(t, diff, "")
-}
-
func TestEscape(t *testing.T) {
replacer := newReplacer()
@@ -147,7 +117,8 @@ net_app_bike_max_speed{} 3`, "bike"},
prom := p.(*Prometheus)
var lines []string
- for _, metrics := range prom.cache.Get() {
+
+ for _, metrics := range prom.aCache.(*memCache).Get() {
for _, metric := range metrics {
lines = append(lines, string(metric))
}
@@ -184,7 +155,8 @@ netapp_change_log{category="metric",cluster="umeng-aff300-01-02",object="volume"
prom := p.(*Prometheus)
var lines []string
- for _, metrics := range prom.cache.Get() {
+
+ for _, metrics := range prom.aCache.(*memCache).Get() {
for _, metric := range metrics {
lines = append(lines, string(metric))
}
@@ -258,7 +230,8 @@ func TestRenderHistogramExample(t *testing.T) {
prom := p.(*Prometheus)
var lines []string
- for _, metrics := range prom.cache.Get() {
+
+ for _, metrics := range prom.aCache.(*memCache).Get() {
for _, metricLine := range metrics {
sline := string(metricLine)
if !strings.HasPrefix(sline, "#") {
diff --git a/docs/prometheus-exporter.md b/docs/prometheus-exporter.md
index 683419bc5..b60738be2 100644
--- a/docs/prometheus-exporter.md
+++ b/docs/prometheus-exporter.md
@@ -58,6 +58,7 @@ An overview of all parameters:
| [`allow_addrs`](#allow_addrs) | list of strings, optional | allow access only if host matches any of the provided addresses | |
| [`allow_addrs_regex`](#allow_addrs_regex) | list of strings, optional | allow access only if host address matches at least one of the regular expressions | |
| `cache_max_keep` | string (Go duration format), optional | maximum amount of time metrics are cached (in case Prometheus does not timely collect the metrics) | `5m` |
+| [`disk_cache`](#disk_cache) | object, optional | disk-based cache configuration | |
| `global_prefix` | string, optional | add a prefix to all metrics (e.g. `netapp_`) | |
| `local_http_addr` | string, optional | address of the HTTP server Harvest starts for Prometheus to scrape:
use `localhost` to serve only on the local machine
use `0.0.0.0` (default) if Prometheus is scrapping from another machine | `0.0.0.0` |
| `port_range` | int-int (range), overrides `port` if specified | lower port to upper port (inclusive) of the HTTP end-point to create when a poller specifies this exporter. Starting at lower port, each free port will be tried sequentially up to the upper port. | |
@@ -221,6 +222,45 @@ Exporters:
Access will only be allowed from the IP4 range `192.168.0.0`-`192.168.0.255`.
+### disk_cache
+The `disk_cache` parameter enables disk-based staging of metrics before they are served to Prometheus. Instead of storing formatted metrics in memory, Harvest flushes them to disk files. When Prometheus scrapes the `/metrics` endpoint, Harvest reads these cached files from disk and streams them directly to Prometheus. This approach reduces memory overhead, making it ideal for large deployments with many metrics.
+
+**Configuration:**
+
+The `disk_cache` parameter requires a `path` field that specifies the directory where cache files will be stored. The path is **mandatory** when using disk cache.
+
+**Notes:**
+
+- The `path` is **required** when using `disk_cache`
+- Harvest will automatically create a subdirectory for each poller to avoid conflicts between multiple pollers
+- The cache directory is cleared on startup
+- Ensure the specified directory is writable by the Harvest process
+
+**Example:**
+
+```yaml
+Exporters:
+ prom_disk:
+ exporter: Prometheus
+ port_range: 13000-13100
+ disk_cache:
+ path: /var/lib/harvest/cache
+
+Pollers:
+ cluster-01:
+ addr: 10.0.1.1
+ exporters:
+ - prom_disk
+ cluster-02:
+ addr: 10.0.1.2
+ exporters:
+ - prom_disk
+```
+
+In this example, cache files will be created in:
+- `/var/lib/harvest/cache/cluster-01/`
+- `/var/lib/harvest/cache/cluster-02/`
+
## Configure Prometheus to scrape Harvest pollers
There are two ways to tell Prometheus how to scrape Harvest: using HTTP service discovery (SD) or listing each poller
diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go
index 2a4949d96..c65fa96a0 100644
--- a/pkg/conf/conf.go
+++ b/pkg/conf/conf.go
@@ -785,6 +785,10 @@ func ZapiPoller(n *node.Node) *Poller {
return &p
}
+type DiskCacheConfig struct {
+ Path string `yaml:"path"`
+}
+
type Exporter struct {
Port *int `yaml:"port,omitempty"`
PortRange *IntRange `yaml:"port_range,omitempty"`
@@ -804,12 +808,13 @@ type Exporter struct {
TLS TLS `yaml:"tls,omitempty"`
// InfluxDB specific
- Bucket *string `yaml:"bucket,omitempty"`
- Org *string `yaml:"org,omitempty"`
- Token *string `yaml:"token,omitempty"`
- Precision *string `yaml:"precision,omitempty"`
- ClientTimeout *string `yaml:"client_timeout,omitempty"`
- Version *string `yaml:"version,omitempty"`
+ Bucket *string `yaml:"bucket,omitempty"`
+ Org *string `yaml:"org,omitempty"`
+ Token *string `yaml:"token,omitempty"`
+ Precision *string `yaml:"precision,omitempty"`
+ ClientTimeout *string `yaml:"client_timeout,omitempty"`
+ Version *string `yaml:"version,omitempty"`
+ DiskCache *DiskCacheConfig `yaml:"disk_cache,omitempty"`
IsTest bool `yaml:"-"` // true when run from unit tests
IsEmbedded bool `yaml:"-"` // true when the exporter is embedded in a poller