Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .pipelines/test.pipeline.dtemplate.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[settings]
id = "test.pipeline.dtemplate"
lines = 2
run = false
run = true
buffer = 5

[[inputs]]
Expand All @@ -19,14 +19,19 @@
[[processors]]
[processors.defaults.labels]
tmplabel = '{{ .RoutingKey }}-{{ .Timestamp.Format "2006-01-02" }}'
[processors.defaults.fields]
tmpfield = '{{ .RoutingKey }}-{{ .Timestamp.Format "2006-01-02" }}'

[[processors]]
[processors.dynamic_template]
enable_metrics = true
template_ttl = "1m"
labels = [ "tmplabel" ]
fields = [ "annotations", "legend", "reason", "number" ]
fields = [ "tmpfield" ]

[[processors]]
[processors.stats]
enable_metrics = true
mode = "shared"
period = "15s"
metric_ttl = "1m"
Expand Down
2 changes: 1 addition & 1 deletion .pipelines/test.pipeline.mixer.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[settings]
id = "test.pipeline.mixer"
lines = 8
run = true
run = false
buffer = 5
log_level = "info"

Expand Down
7 changes: 4 additions & 3 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -936,11 +936,12 @@ Labels:
- **plugin_name** - plugin name (alias)
- **driver** - driver name

### Stats Metrics Cache
### Internal Caches

#### Gauge `plugin_stats_metrics_cached`
Number of cached metrics.
#### Gauge `plugin_cache_size`
Number of cached objects.

Labels:
- **pipeline** - pipeline Id
- **plugin_name** - plugin name (alias)
- **items** - what kind of objects stored in cache
79 changes: 79 additions & 0 deletions plugins/common/metrics/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package metrics

import (
"fmt"
"sync"

"github.com/gekatateam/neptunus/metrics"
)

var (
pluginCacheSize = func(d cacheDescriptor) string {
return fmt.Sprintf("plugin_cache_size{pipeline=%q,plugin_name=%q,items=%q}", d.pipeline, d.pluginName, d.cacheItems)
}
)

type sizer interface {
Size() int
}

var (
cacheMetricsRegister = &sync.Once{}
cacheMetricsCollector = &cacheCollector{
stats: make(map[cacheDescriptor]sizer),
mu: &sync.Mutex{},
}
)

type cacheDescriptor struct {
pipeline string
pluginName string
cacheItems string
}

type cacheCollector struct {
stats map[cacheDescriptor]sizer
mu *sync.Mutex
}

func (c *cacheCollector) append(d cacheDescriptor, sc sizer) {
c.mu.Lock()
defer c.mu.Unlock()
c.stats[d] = sc
}

func (c *cacheCollector) delete(d cacheDescriptor) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.stats, d)
metrics.PluginsSet.UnregisterMetric(pluginCacheSize(d))
}

func (c *cacheCollector) Collect() {
c.mu.Lock()
defer c.mu.Unlock()

for d, sc := range c.stats {
metrics.PluginsSet.GetOrCreateGauge(pluginCacheSize(d), nil).Set(float64(sc.Size()))
}
}

func RegisterCache(pipeline, pluginName, items string, sc sizer) {
cacheMetricsRegister.Do(func() {
metrics.GlobalCollectorsRunner.Append(cacheMetricsCollector)
})

cacheMetricsCollector.append(cacheDescriptor{
pipeline: pipeline,
pluginName: pluginName,
cacheItems: items,
}, sc)
}

func UnregisterCache(pipeline, pluginName, items string) {
cacheMetricsCollector.delete(cacheDescriptor{
pipeline: pipeline,
pluginName: pluginName,
cacheItems: items,
})
}
76 changes: 0 additions & 76 deletions plugins/common/metrics/stats_cache.go

This file was deleted.

6 changes: 6 additions & 0 deletions plugins/processors/dynamic_template/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ Plugin uses [wrapped events](../../common/template/README.md).

If template execution fails, event is marked as failed, but other templates execution continues.

> [!TIP]
> This plugin may write it's own [metrics](../../../docs/METRICS.md#internal-caches)

## Configuration
```toml
[[processors]]
[processors.dynamic_template]
# if true, plugin metrics cache length exposed as metric
enable_metrics = false

# compiled template TTL
template_ttl = "1h"

Expand Down
7 changes: 6 additions & 1 deletion plugins/processors/dynamic_template/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type templateCache struct {
mu *sync.Mutex
}

func (c *templateCache) Size() int {
return len(c.m)
}

func (c *templateCache) Reg() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -64,8 +68,9 @@ func (c *templateCache) Leave() {
defer c.mu.Unlock()

c.u--
if c.u == 0 {
if c.u <= 0 {
clear(c.m)
clear(c.d)
c.u = 0
}
}
7 changes: 7 additions & 0 deletions plugins/processors/dynamic_template/dynamic_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/gekatateam/neptunus/metrics"
"github.com/gekatateam/neptunus/plugins"
"github.com/gekatateam/neptunus/plugins/common/elog"
cachestats "github.com/gekatateam/neptunus/plugins/common/metrics"
cte "github.com/gekatateam/neptunus/plugins/common/template"
)

type DynamicTemplate struct {
*core.BaseProcessor `mapstructure:"-"`
EnableMetrics bool `mapstructure:"enable_metrics"`
TemplateTTL time.Duration `mapstructure:"template_ttl"`
Labels []string `mapstructure:"labels"`
Fields []string `mapstructure:"fields"`
Expand All @@ -41,6 +43,11 @@ func (p *DynamicTemplate) Run() {
clearTicker.Stop()
}

if p.EnableMetrics {
cachestats.RegisterCache(p.Pipeline, p.Alias, "templates", &cache)
defer cachestats.UnregisterCache(p.Pipeline, p.Alias, "templates")
}

MAIN_LOOP:
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion plugins/processors/stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This is the format of stats event:
```

> [!TIP]
> This plugin may write it's own [metrics](../../../docs/METRICS.md#stats-metrics-cache)
> This plugin may write it's own [metrics](../../../docs/METRICS.md#internal-caches)

## Configuration
```toml
Expand Down
8 changes: 4 additions & 4 deletions plugins/processors/stats/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type cache interface {
StatsCached() int
Size() int

observe(m *metric, b map[float64]float64, v float64)
flush(out chan<- *core.Event, flushFn func(m *metric, ch chan<- *core.Event))
Expand Down Expand Up @@ -66,7 +66,7 @@ func (c individualCache) clear() {
clear(c.d)
}

func (c individualCache) StatsCached() int {
func (c individualCache) Size() int {
return len(c.c)
}

Expand Down Expand Up @@ -161,9 +161,9 @@ func (c *sharedCache) clear() {
}
}

func (c *sharedCache) StatsCached() int {
func (c *sharedCache) Size() int {
c.mu.Lock()
defer c.mu.Unlock()

return c.cache.StatsCached()
return c.cache.Size()
}
6 changes: 3 additions & 3 deletions plugins/processors/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/gekatateam/neptunus/metrics"
"github.com/gekatateam/neptunus/plugins"
"github.com/gekatateam/neptunus/plugins/common/convert"
pluginstats "github.com/gekatateam/neptunus/plugins/common/metrics"
cachestats "github.com/gekatateam/neptunus/plugins/common/metrics"
)

var noLabelsSlice = make([]metricLabel, 0)
Expand Down Expand Up @@ -108,8 +108,8 @@ func (p *Stats) SetId(id uint64) {

func (p *Stats) Run() {
if p.EnableMetrics {
pluginstats.RegisterStatsCache(p.Pipeline, p.Plugin, p.cache)
defer pluginstats.UnregisterStatsCache(p.Pipeline, p.Plugin)
cachestats.RegisterCache(p.Pipeline, p.Alias, "stats", p.cache)
defer cachestats.UnregisterCache(p.Pipeline, p.Alias, "stats")
}

flushTicker := time.NewTicker(p.Period)
Expand Down