Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .pipelines/test.pipeline.mixer.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[settings]
id = "test.pipeline.mixer"
lines = 8
run = false
run = true
buffer = 5
log_level = "info"

Expand All @@ -15,7 +15,7 @@
# this job is used to create table partitions
[[inputs.cronjob.jobs]]
name = "partition.create"
schedule = "@every 5s"
schedule = "@every 1s"
force = true

[[processors]]
Expand All @@ -42,6 +42,10 @@ def process(event):
return events
'''

[[processors]]
[processors.through]
# sleep = "1s"

[[processors]]
[processors.mixer]
# log_level = "debug"
Expand Down
6 changes: 3 additions & 3 deletions core/unit/soft.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (u *procSoftUnit) Run() {
u.wg.Add(1)
go func(f core.Filter, c chan<- *core.Event) {
f.Run() // blocking call, loop inside
close(c)
f.Close()
close(c)
u.wg.Done()
}(v.f, v.c)
}
Expand Down Expand Up @@ -136,8 +136,8 @@ func (u *outSoftUnit) Run() {
u.wg.Add(1)
go func(f core.Filter, c chan<- *core.Event) {
f.Run() // blocking call, loop inside
close(c)
f.Close()
close(c)
u.wg.Done()
}(v.f, v.c)
}
Expand Down Expand Up @@ -216,8 +216,8 @@ func (u *inSoftUnit) Run() {
u.wg.Add(1)
go func(f core.Filter, c chan<- *core.Event) {
f.Run() // blocking call, loop inside
close(c)
f.Close()
close(c)
u.wg.Done()
}(v.f, v.c)
}
Expand Down
34 changes: 34 additions & 0 deletions docs/PLUGINS_MODEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,38 @@ This section is for developers who want to create a new plugin.

There a nine types of plugins and some of it works directly with channels (we call it streaming plugins), and some of it not (callable or child plugins). Start by looking at the interfaces that plugins must conform to and some base structs that must be embedded - [here](../core/plugin.go) and [here](../core/base.go).

## Registration

Any plugin MUST be registered using one of the `Add` functions from [registry](../plugins/registry.go) and it's name MUST be unique across it's type. You can do it in your package `init()` func. Also, it is the place where you can set default values for your plugin configuration:
```go
func init() {
plugins.AddInput("kafka", func() core.Input {
return &Kafka{
ClientId: "neptunus.kafka.input",
GroupId: "neptunus.kafka.input",
GroupBalancer: "range",
StartOffset: "last",
OnParserError: "drop",
GroupTTL: 24 * time.Hour,
DialTimeout: 5 * time.Second,
SessionTimeout: 30 * time.Second,
RebalanceTimeout: 30 * time.Second,
HeartbeatInterval: 3 * time.Second,
ReadBatchTimeout: 3 * time.Second,
WaitBatchTimeout: 3 * time.Second,
MaxUncommitted: 100,
CommitInterval: 1 * time.Second,
MaxBatchSize: datasize.Mebibyte, // 1 MiB,
SASL: SASL{
Mechanism: "none",
},
Ider: &ider.Ider{},
TLSClientConfig: &tls.TLSClientConfig{},
}
})
}
```

## Plugin lifecycle

Any plugin lifecycle is `create` -> `init` -> `set channels (for streaming plugins)` -> `run/call` -> `close`, and the engine takes care of some stages.
Expand Down Expand Up @@ -39,6 +71,8 @@ Also, it is your responsibility to write plugin metrics. The base struct contain

You can find some heplers in [plugins/common/](../plugins/common/) dir, such as [Batcher](../plugins/common/batcher/), [Retryer](../plugins/common/retryer/) or [Pool](../plugins/common/pool/).

In case of callable plugins, please remember that a plugin may be called simultaneously from multiple goroutines, so, make it concurrent-safety.

### Close

When the engine receives a signal to stop a pipeline, it calls inputs `Close() error` method.
Expand Down
33 changes: 18 additions & 15 deletions plugins/core/mixer/mixer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/gekatateam/neptunus/core"
Expand All @@ -16,8 +15,7 @@ var outs = &sync.Map{}

type outputChans struct {
mu *sync.RWMutex
c *atomic.Int32
ch []chan<- *core.Event
ch map[int]chan<- *core.Event
}

type Mixer struct {
Expand Down Expand Up @@ -45,7 +43,14 @@ func (p *Mixer) Close() error {
return nil
}

if curr.(outputChans).c.Add(-1) == 0 {
stored := curr.(outputChans)
stored.mu.Lock()
defer stored.mu.Unlock()

delete(stored.ch, p.line)
p.Log.Info(fmt.Sprintf("event output chan deleted on line %v; channels total: %v", p.line, len(stored.ch)))

if len(stored.ch) == 0 {
outs.Delete(p.id)
}
return nil
Expand All @@ -54,25 +59,23 @@ func (p *Mixer) Close() error {
func (p *Mixer) SetChannels(in <-chan *core.Event, out chan<- *core.Event, _ chan<- *core.Event) {
p.in = in
curr, ok := outs.Load(p.id)
var stored outputChans

if !ok {
stored := outputChans{
stored = outputChans{
mu: &sync.RWMutex{},
c: &atomic.Int32{},
ch: []chan<- *core.Event{out},
ch: make(map[int]chan<- *core.Event, 5),
}
stored.c.Add(1)
outs.Store(p.id, stored)
return
} else {
stored = curr.(outputChans)
}

stored := curr.(outputChans)
stored.mu.Lock()
defer stored.mu.Unlock()

stored.c.Add(1)
stored.ch = append(stored.ch, out)

stored.ch[p.line] = out
outs.Store(p.id, stored)
p.Log.Info(fmt.Sprintf("event output chan handled on line %v; channels total: %v", p.line, len(stored.ch)))
}

func (p *Mixer) Run() {
Expand All @@ -96,11 +99,11 @@ func (p *Mixer) Run() {
})
}

stored.mu.RUnlock()
chosen, _, _ := reflect.Select(cases)
p.Log.Debug(fmt.Sprintf("event sent to chan %v", chosen),
elog.EventGroup(e),
)
stored.mu.RUnlock()

p.Observe(metrics.EventAccepted, time.Since(now))
}
Expand Down
5 changes: 3 additions & 2 deletions plugins/decompressors/gzip/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func (c *Gzip) Close() error {
}

func (c *Gzip) Decompress(data []byte) ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
buf.Write(data)
buf := bytes.NewBuffer(data)

var r *gzip.Reader
if poolReader := readersPool.Get(); poolReader == nil {
Expand All @@ -37,6 +36,8 @@ func (c *Gzip) Decompress(data []byte) ([]byte, error) {
r = poolReader.(*gzip.Reader)
r.Reset(buf)
}

defer readersPool.Put(r)
defer r.Close()

data, err := io.ReadAll(r)
Expand Down
4 changes: 4 additions & 0 deletions plugins/processors/through/through.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gekatateam/neptunus/core"
"github.com/gekatateam/neptunus/metrics"
"github.com/gekatateam/neptunus/plugins"
"github.com/gekatateam/neptunus/plugins/common/elog"
)

type Through struct {
Expand All @@ -26,6 +27,9 @@ func (p *Through) Run() {
now := time.Now()

if p.Sleep > 0 {
p.Log.Warn("sleeping",
elog.EventGroup(e),
)
time.Sleep(p.Sleep)
}

Expand Down