diff --git a/.pipelines/test.pipeline.mixer.toml b/.pipelines/test.pipeline.mixer.toml index c7b70ced..4bb7c52d 100644 --- a/.pipelines/test.pipeline.mixer.toml +++ b/.pipelines/test.pipeline.mixer.toml @@ -1,7 +1,7 @@ [settings] id = "test.pipeline.mixer" lines = 8 - run = false + run = true buffer = 5 log_level = "info" @@ -15,7 +15,7 @@ # this job is used to create table partitions [[inputs.cronjob.jobs]] name = "partition.create" - schedule = "@every 5s" + schedule = "@every 1s" force = true [[processors]] @@ -42,6 +42,10 @@ def process(event): return events ''' +[[processors]] + [processors.through] +# sleep = "1s" + [[processors]] [processors.mixer] # log_level = "debug" diff --git a/core/unit/soft.go b/core/unit/soft.go index cb88f879..a33a0557 100644 --- a/core/unit/soft.go +++ b/core/unit/soft.go @@ -66,8 +66,8 @@ func (u *procSoftUnit) Run() { u.wg.Add(1) go func(f core.Filter, c chan<- *core.Event) { f.Run() // blocking call, loop inside - close(c) f.Close() + close(c) u.wg.Done() }(v.f, v.c) } @@ -136,8 +136,8 @@ func (u *outSoftUnit) Run() { u.wg.Add(1) go func(f core.Filter, c chan<- *core.Event) { f.Run() // blocking call, loop inside - close(c) f.Close() + close(c) u.wg.Done() }(v.f, v.c) } @@ -216,8 +216,8 @@ func (u *inSoftUnit) Run() { u.wg.Add(1) go func(f core.Filter, c chan<- *core.Event) { f.Run() // blocking call, loop inside - close(c) f.Close() + close(c) u.wg.Done() }(v.f, v.c) } diff --git a/docs/PLUGINS_MODEL.md b/docs/PLUGINS_MODEL.md index bd04ab88..7baba2ad 100644 --- a/docs/PLUGINS_MODEL.md +++ b/docs/PLUGINS_MODEL.md @@ -4,6 +4,38 @@ This section is for developers who want to create a new plugin. There a nine types of plugins and some of it works directly with channels (we call it streaming plugins), and some of it not (callable or child plugins). Start by looking at the interfaces that plugins must conform to and some base structs that must be embedded - [here](../core/plugin.go) and [here](../core/base.go). +## Registration + +Any plugin MUST be registered using one of the `Add` functions from [registry](../plugins/registry.go) and it's name MUST be unique across it's type. You can do it in your package `init()` func. Also, it is the place where you can set default values for your plugin configuration: +```go +func init() { + plugins.AddInput("kafka", func() core.Input { + return &Kafka{ + ClientId: "neptunus.kafka.input", + GroupId: "neptunus.kafka.input", + GroupBalancer: "range", + StartOffset: "last", + OnParserError: "drop", + GroupTTL: 24 * time.Hour, + DialTimeout: 5 * time.Second, + SessionTimeout: 30 * time.Second, + RebalanceTimeout: 30 * time.Second, + HeartbeatInterval: 3 * time.Second, + ReadBatchTimeout: 3 * time.Second, + WaitBatchTimeout: 3 * time.Second, + MaxUncommitted: 100, + CommitInterval: 1 * time.Second, + MaxBatchSize: datasize.Mebibyte, // 1 MiB, + SASL: SASL{ + Mechanism: "none", + }, + Ider: &ider.Ider{}, + TLSClientConfig: &tls.TLSClientConfig{}, + } + }) +} +``` + ## Plugin lifecycle Any plugin lifecycle is `create` -> `init` -> `set channels (for streaming plugins)` -> `run/call` -> `close`, and the engine takes care of some stages. @@ -39,6 +71,8 @@ Also, it is your responsibility to write plugin metrics. The base struct contain You can find some heplers in [plugins/common/](../plugins/common/) dir, such as [Batcher](../plugins/common/batcher/), [Retryer](../plugins/common/retryer/) or [Pool](../plugins/common/pool/). +In case of callable plugins, please remember that a plugin may be called simultaneously from multiple goroutines, so, make it concurrent-safety. + ### Close When the engine receives a signal to stop a pipeline, it calls inputs `Close() error` method. diff --git a/plugins/core/mixer/mixer.go b/plugins/core/mixer/mixer.go index 5bb87787..f2206f22 100644 --- a/plugins/core/mixer/mixer.go +++ b/plugins/core/mixer/mixer.go @@ -4,7 +4,6 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" "time" "github.com/gekatateam/neptunus/core" @@ -16,8 +15,7 @@ var outs = &sync.Map{} type outputChans struct { mu *sync.RWMutex - c *atomic.Int32 - ch []chan<- *core.Event + ch map[int]chan<- *core.Event } type Mixer struct { @@ -45,7 +43,14 @@ func (p *Mixer) Close() error { return nil } - if curr.(outputChans).c.Add(-1) == 0 { + stored := curr.(outputChans) + stored.mu.Lock() + defer stored.mu.Unlock() + + delete(stored.ch, p.line) + p.Log.Info(fmt.Sprintf("event output chan deleted on line %v; channels total: %v", p.line, len(stored.ch))) + + if len(stored.ch) == 0 { outs.Delete(p.id) } return nil @@ -54,25 +59,23 @@ func (p *Mixer) Close() error { func (p *Mixer) SetChannels(in <-chan *core.Event, out chan<- *core.Event, _ chan<- *core.Event) { p.in = in curr, ok := outs.Load(p.id) + var stored outputChans + if !ok { - stored := outputChans{ + stored = outputChans{ mu: &sync.RWMutex{}, - c: &atomic.Int32{}, - ch: []chan<- *core.Event{out}, + ch: make(map[int]chan<- *core.Event, 5), } - stored.c.Add(1) - outs.Store(p.id, stored) - return + } else { + stored = curr.(outputChans) } - stored := curr.(outputChans) stored.mu.Lock() defer stored.mu.Unlock() - stored.c.Add(1) - stored.ch = append(stored.ch, out) - + stored.ch[p.line] = out outs.Store(p.id, stored) + p.Log.Info(fmt.Sprintf("event output chan handled on line %v; channels total: %v", p.line, len(stored.ch))) } func (p *Mixer) Run() { @@ -96,11 +99,11 @@ func (p *Mixer) Run() { }) } - stored.mu.RUnlock() chosen, _, _ := reflect.Select(cases) p.Log.Debug(fmt.Sprintf("event sent to chan %v", chosen), elog.EventGroup(e), ) + stored.mu.RUnlock() p.Observe(metrics.EventAccepted, time.Since(now)) } diff --git a/plugins/decompressors/gzip/gzip.go b/plugins/decompressors/gzip/gzip.go index 3447617e..2b658b31 100644 --- a/plugins/decompressors/gzip/gzip.go +++ b/plugins/decompressors/gzip/gzip.go @@ -23,8 +23,7 @@ func (c *Gzip) Close() error { } func (c *Gzip) Decompress(data []byte) ([]byte, error) { - buf := bytes.NewBuffer(make([]byte, 0, 1024)) - buf.Write(data) + buf := bytes.NewBuffer(data) var r *gzip.Reader if poolReader := readersPool.Get(); poolReader == nil { @@ -37,6 +36,8 @@ func (c *Gzip) Decompress(data []byte) ([]byte, error) { r = poolReader.(*gzip.Reader) r.Reset(buf) } + + defer readersPool.Put(r) defer r.Close() data, err := io.ReadAll(r) diff --git a/plugins/processors/through/through.go b/plugins/processors/through/through.go index 2eb379bd..fdd60746 100644 --- a/plugins/processors/through/through.go +++ b/plugins/processors/through/through.go @@ -6,6 +6,7 @@ import ( "github.com/gekatateam/neptunus/core" "github.com/gekatateam/neptunus/metrics" "github.com/gekatateam/neptunus/plugins" + "github.com/gekatateam/neptunus/plugins/common/elog" ) type Through struct { @@ -26,6 +27,9 @@ func (p *Through) Run() { now := time.Now() if p.Sleep > 0 { + p.Log.Warn("sleeping", + elog.EventGroup(e), + ) time.Sleep(p.Sleep) }