Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 159 additions & 7 deletions cmd/exporters/prometheus/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,188 @@
package prometheus

import (
"bytes"
"github.com/netapp/harvest/v2/pkg/set"
"github.com/netapp/harvest/v2/pkg/slogx"
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"
)

type cache struct {
*sync.Mutex
type cacher interface {
getOverview() (*CacheStats, error)
exportMetrics(key string, data [][]byte, names *set.Set)
streamMetrics(w http.ResponseWriter, seen map[string]struct{}, metrics [][]byte) (int, error)
isValid() bool
}

type memCache struct {
mu *sync.Mutex
logger *slog.Logger
data map[string][][]byte
timers map[string]time.Time
expire time.Duration
}

func newCache(d time.Duration) *cache {
c := cache{Mutex: &sync.Mutex{}, expire: d}
func (c *memCache) isValid() bool {
return true
}

func (c *memCache) getOverview() (*CacheStats, error) {
c.mu.Lock()
cacheData := make(map[string][][]byte)
for key, data := range c.Get() {
cacheData[key] = make([][]byte, len(data))
copy(cacheData[key], data)
}
c.mu.Unlock()

stats := &CacheStats{
UniqueData: make(map[string]map[string][]string),
}

for key, data := range cacheData {
var collector, object string

if keys := strings.Split(key, "."); len(keys) == 3 {
collector = keys[0]
object = keys[1]
} else {
continue
}

// skip metadata
if strings.HasPrefix(object, "metadata_") {
continue
}

metricNames := set.New()
for _, m := range data {
if x := strings.Split(string(m), "{"); len(x) >= 2 && x[0] != "" {
metricNames.Add(x[0])
}
}
stats.NumMetrics += metricNames.Size()

if _, exists := stats.UniqueData[collector]; !exists {
stats.UniqueData[collector] = make(map[string][]string)
stats.NumCollectors++
}
if _, exists := stats.UniqueData[collector][object]; !exists {
stats.NumObjects++
}
stats.UniqueData[collector][object] = metricNames.Values()
}

return stats, nil
}

func (c *memCache) exportMetrics(key string, data [][]byte, metricNames *set.Set) {
c.Put(key, data, metricNames)
}

func (c *memCache) streamMetrics(w http.ResponseWriter, tagsSeen map[string]struct{}, metrics [][]byte) (int, error) {
c.mu.Lock()
var count int
if metrics == nil {
// stream all cached metrics
for _, metrics := range c.Get() {
count += c.writeMetrics(w, metrics, tagsSeen)
}
} else {
// stream only provided metrics
count += c.writeMetrics(w, metrics, tagsSeen)
}

c.mu.Unlock()

return count, nil
}

// writeMetrics writes metrics to the writer, skipping duplicates.
// Normally Render() only adds one TYPE/HELP for each metric type.
// Some metric types (e.g., metadata_collector_metrics) are submitted from multiple collectors.
// That causes duplicates that are suppressed in this function.
// The seen map is used to keep track of which metrics have been added.
func (c *memCache) writeMetrics(w io.Writer, metrics [][]byte, tagsSeen map[string]struct{}) int {

var count int

for i := 0; i < len(metrics); i++ {
metric := metrics[i]
if bytes.HasPrefix(metric, []byte("# ")) {

// Find the metric name and check if it has been seen before
var (
spacesSeen int
space2Index int
)

for j := range metric {
if metric[j] == ' ' {
spacesSeen++
if spacesSeen == 2 {
space2Index = j
} else if spacesSeen == 3 {
name := string(metric[space2Index+1 : j])
if _, ok := tagsSeen[name]; !ok {
tagsSeen[name] = struct{}{}
c.writeMetric(w, metric)
count++
if i+1 < len(metrics) {
c.writeMetric(w, metrics[i+1])
count++
i++
}
}
break
}
}
}
} else {
c.writeMetric(w, metric)
count++
}
}

return count
}

func (c *memCache) writeMetric(w io.Writer, data []byte) {
_, err := w.Write(data)
if err != nil {
c.logger.Error("write metrics", slogx.Err(err))
}
_, err = w.Write([]byte("\n"))
if err != nil {
c.logger.Error("write newline", slogx.Err(err))
}
}

func newMemCache(l *slog.Logger, d time.Duration) *memCache {
c := memCache{mu: &sync.Mutex{}, expire: d, logger: l}
c.data = make(map[string][][]byte)
c.timers = make(map[string]time.Time)
return &c
}

func (c *cache) Get() map[string][][]byte {
func (c *memCache) Get() map[string][][]byte {
c.Clean()
return c.data
}

func (c *cache) Put(key string, data [][]byte, _ *set.Set) {
func (c *memCache) Put(key string, data [][]byte, _ *set.Set) {
c.mu.Lock()
defer c.mu.Unlock()

c.data[key] = data
c.timers[key] = time.Now()
}

func (c *cache) Clean() {
func (c *memCache) Clean() {
for k, t := range c.timers {
if time.Since(t) > c.expire {
delete(c.timers, k)
Expand Down
38 changes: 38 additions & 0 deletions cmd/exporters/prometheus/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package prometheus

import (
"github.com/google/go-cmp/cmp"
"github.com/netapp/harvest/v2/assert"
"strings"
"testing"
)

func Test_memcache_streamMetrics(t *testing.T) {
example := [][]byte{
[]byte(`# HELP some_metric help text`),
[]byte(`# TYPE some_metric type`),
[]byte(`some_metric{node="node_1"} 0.0`),
[]byte(`# HELP some_other_metric help text`),
[]byte(`# TYPE some_other_metric type`),
[]byte(`some_other_metric{node="node_2"} 0.0`),
[]byte(`# HELP some_other_metric DUPLICATE help text`),
[]byte(`# TYPE some_other_metric type`),
[]byte(`some_other_metric{node="node_3"} 0.0`),
}

expected := `# HELP some_metric help text
# TYPE some_metric type
some_metric{node="node_1"} 0.0
# HELP some_other_metric help text
# TYPE some_other_metric type
some_other_metric{node="node_2"} 0.0
some_other_metric{node="node_3"} 0.0
`
m := memCache{}
seen := make(map[string]struct{})
var w strings.Builder
_ = m.writeMetrics(&w, example, seen)

diff := cmp.Diff(w.String(), expected)
assert.Equal(t, diff, "")
}
50 changes: 44 additions & 6 deletions cmd/exporters/prometheus/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/netapp/harvest/v2/pkg/slogx"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
Expand All @@ -23,7 +24,7 @@ type CacheStats struct {
}

type diskCache struct {
*sync.Mutex
mu *sync.Mutex
files map[string]string // key -> filepath
timers map[string]time.Time // key -> timestamp
metricNames map[string]*set.Set // key -> metric names
Expand All @@ -38,6 +39,40 @@ type diskCache struct {
keyReplacer *strings.Replacer
}

func (dc *diskCache) isValid() bool {
return dc != nil && dc.baseDir != ""
}

func (dc *diskCache) getOverview() (*CacheStats, error) {
dc.mu.Lock()
stats, err := dc.GetStats()
dc.mu.Unlock()
if err != nil {
return nil, err
}

return stats, nil
}

func (dc *diskCache) exportMetrics(key string, data [][]byte, metricNames *set.Set) {
dc.Put(key, data, metricNames)
}

func (dc *diskCache) streamMetrics(w http.ResponseWriter, _ map[string]struct{}, metrics [][]byte) (int, error) {
// since the disk cache streams all cached metrics including metadata, we ignore streaming when metrics is not nil
if metrics != nil {
return 0, nil
}
dc.mu.Lock()
defer dc.mu.Unlock()

err := dc.streamToWriter(w)
if err != nil {
return 0, err
}
return dc.GetMetricCount(), nil
}

func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCache {
if d <= 0 {
logger.Warn("invalid expire duration, using default 5 minutes", slog.Duration("provided", d))
Expand All @@ -56,7 +91,7 @@ func newDiskCache(d time.Duration, baseDir string, logger *slog.Logger) *diskCac

ctx, cancel := context.WithCancel(context.Background())
dc := &diskCache{
Mutex: &sync.Mutex{},
mu: &sync.Mutex{},
files: make(map[string]string),
timers: make(map[string]time.Time),
metricNames: make(map[string]*set.Set),
Expand Down Expand Up @@ -151,6 +186,9 @@ func (dc *diskCache) GetMetricCount() int {

// Put stores metrics to disk and updates cache metadata.
func (dc *diskCache) Put(key string, data [][]byte, metricNames *set.Set) {
dc.mu.Lock()
defer dc.mu.Unlock()

filePath := dc.generateFilepath(key)

if err := dc.writeToDisk(filePath, data); err != nil {
Expand All @@ -176,8 +214,8 @@ func (dc *diskCache) Put(key string, data [][]byte, metricNames *set.Set) {
slog.Int("metrics_count", len(data)))
}

// StreamToWriter streams all non-expired cache files to the writer.
func (dc *diskCache) StreamToWriter(w io.Writer) error {
// streamToWriter streams all non-expired cache files to the writer.
func (dc *diskCache) streamToWriter(w io.Writer) error {
var resultErr error
errorCount := 0
totalCount := 0
Expand Down Expand Up @@ -240,8 +278,8 @@ func (dc *diskCache) streamFile(filePath string, w io.Writer) error {
}

func (dc *diskCache) Clean() {
dc.Lock()
defer dc.Unlock()
dc.mu.Lock()
defer dc.mu.Unlock()

for key, timestamp := range dc.timers {
if time.Since(timestamp) <= dc.expire {
Expand Down
Loading
Loading